~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testtransport.py

  • Committer: John Arbash Meinel
  • Date: 2005-09-15 21:35:53 UTC
  • mfrom: (907.1.57)
  • mto: (1393.2.1)
  • mto: This revision was merged to the branch mainline in revision 1396.
  • Revision ID: john@arbash-meinel.com-20050915213552-a6c83a5ef1e20897
(broken) Transport work is merged in. Tests do not pass yet.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
 
#
 
1
# Copyright (C) 2004, 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
 
import os
19
 
import sys
20
 
import stat
21
 
from cStringIO import StringIO
22
 
 
23
 
import bzrlib
24
 
from bzrlib.errors import (NoSuchFile, FileExists,
25
 
                           TransportNotPossible,
26
 
                           ConnectionError,
27
 
                           DependencyNotPresent,
28
 
                           UnsupportedProtocol,
29
 
                           )
30
 
from bzrlib.tests import TestCase, TestCaseInTempDir
31
 
from bzrlib.transport import (_CoalescedOffset,
32
 
                              _get_protocol_handlers,
33
 
                              _get_transport_modules,
34
 
                              get_transport,
35
 
                              register_lazy_transport,
36
 
                              _set_protocol_handlers,
37
 
                              Transport,
38
 
                              )
39
 
from bzrlib.transport.memory import MemoryTransport
40
 
from bzrlib.transport.local import LocalTransport
41
 
 
42
 
 
43
 
class TestTransport(TestCase):
44
 
    """Test the non transport-concrete class functionality."""
45
 
 
46
 
    def test__get_set_protocol_handlers(self):
47
 
        handlers = _get_protocol_handlers()
48
 
        self.assertNotEqual({}, handlers)
49
 
        try:
50
 
            _set_protocol_handlers({})
51
 
            self.assertEqual({}, _get_protocol_handlers())
52
 
        finally:
53
 
            _set_protocol_handlers(handlers)
54
 
 
55
 
    def test_get_transport_modules(self):
56
 
        handlers = _get_protocol_handlers()
57
 
        class SampleHandler(object):
58
 
            """I exist, isnt that enough?"""
59
 
        try:
60
 
            my_handlers = {}
61
 
            _set_protocol_handlers(my_handlers)
62
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
63
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
64
 
            self.assertEqual([SampleHandler.__module__],
65
 
                             _get_transport_modules())
66
 
        finally:
67
 
            _set_protocol_handlers(handlers)
68
 
 
69
 
    def test_transport_dependency(self):
70
 
        """Transport with missing dependency causes no error"""
71
 
        saved_handlers = _get_protocol_handlers()
72
 
        try:
73
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
74
 
                    'BadTransportHandler')
75
 
            try:
76
 
                get_transport('foo://fooserver/foo')
77
 
            except UnsupportedProtocol, e:
78
 
                e_str = str(e)
79
 
                self.assertEquals('Unsupported protocol'
80
 
                                  ' for url "foo://fooserver/foo":'
81
 
                                  ' Unable to import library "some_lib":'
82
 
                                  ' testing missing dependency', str(e))
83
 
            else:
84
 
                self.fail('Did not raise UnsupportedProtocol')
85
 
        finally:
86
 
            # restore original values
87
 
            _set_protocol_handlers(saved_handlers)
88
 
            
89
 
    def test_transport_fallback(self):
90
 
        """Transport with missing dependency causes no error"""
91
 
        saved_handlers = _get_protocol_handlers()
92
 
        try:
93
 
            _set_protocol_handlers({})
94
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
95
 
                    'BackupTransportHandler')
96
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
97
 
                    'BadTransportHandler')
98
 
            t = get_transport('foo://fooserver/foo')
99
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
100
 
        finally:
101
 
            _set_protocol_handlers(saved_handlers)
102
 
 
103
 
 
104
 
class TestCoalesceOffsets(TestCase):
105
 
    
106
 
    def check(self, expected, offsets, limit=0, fudge=0):
107
 
        coalesce = Transport._coalesce_offsets
108
 
        exp = [_CoalescedOffset(*x) for x in expected]
109
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
110
 
        self.assertEqual(exp, out)
111
 
 
112
 
    def test_coalesce_empty(self):
113
 
        self.check([], [])
114
 
 
115
 
    def test_coalesce_simple(self):
116
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
117
 
 
118
 
    def test_coalesce_unrelated(self):
119
 
        self.check([(0, 10, [(0, 10)]),
120
 
                    (20, 10, [(0, 10)]),
121
 
                   ], [(0, 10), (20, 10)])
122
 
            
123
 
    def test_coalesce_unsorted(self):
124
 
        self.check([(20, 10, [(0, 10)]),
125
 
                    (0, 10, [(0, 10)]),
126
 
                   ], [(20, 10), (0, 10)])
127
 
 
128
 
    def test_coalesce_nearby(self):
129
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
130
 
                   [(0, 10), (10, 10)])
131
 
 
132
 
    def test_coalesce_overlapped(self):
133
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
134
 
                   [(0, 10), (5, 10)])
135
 
 
136
 
    def test_coalesce_limit(self):
137
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
138
 
                              (30, 10), (40, 10)]),
139
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
140
 
                              (30, 10), (40, 10)]),
141
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
142
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
143
 
                       (90, 10), (100, 10)],
144
 
                    limit=5)
145
 
 
146
 
    def test_coalesce_no_limit(self):
147
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
148
 
                               (30, 10), (40, 10), (50, 10),
149
 
                               (60, 10), (70, 10), (80, 10),
150
 
                               (90, 10)]),
151
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
152
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
153
 
                       (90, 10), (100, 10)])
154
 
 
155
 
    def test_coalesce_fudge(self):
156
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
157
 
                    (100, 10, [(0, 10),]),
158
 
                   ], [(10, 10), (30, 10), (100, 10)],
159
 
                   fudge=10
160
 
                  )
161
 
 
162
 
 
163
 
class TestMemoryTransport(TestCase):
164
 
 
165
 
    def test_get_transport(self):
166
 
        MemoryTransport()
167
 
 
168
 
    def test_clone(self):
169
 
        transport = MemoryTransport()
170
 
        self.assertTrue(isinstance(transport, MemoryTransport))
171
 
        self.assertEqual("memory:///", transport.clone("/").base)
172
 
 
173
 
    def test_abspath(self):
174
 
        transport = MemoryTransport()
175
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
176
 
 
177
 
    def test_abspath_of_root(self):
178
 
        transport = MemoryTransport()
179
 
        self.assertEqual("memory:///", transport.base)
180
 
        self.assertEqual("memory:///", transport.abspath('/'))
181
 
 
182
 
    def test_relpath(self):
183
 
        transport = MemoryTransport()
184
 
 
185
 
    def test_append_and_get(self):
186
 
        transport = MemoryTransport()
187
 
        transport.append_bytes('path', 'content')
188
 
        self.assertEqual(transport.get('path').read(), 'content')
189
 
        transport.append_file('path', StringIO('content'))
190
 
        self.assertEqual(transport.get('path').read(), 'contentcontent')
191
 
 
192
 
    def test_put_and_get(self):
193
 
        transport = MemoryTransport()
194
 
        transport.put_file('path', StringIO('content'))
195
 
        self.assertEqual(transport.get('path').read(), 'content')
196
 
        transport.put_bytes('path', 'content')
197
 
        self.assertEqual(transport.get('path').read(), 'content')
198
 
 
199
 
    def test_append_without_dir_fails(self):
200
 
        transport = MemoryTransport()
201
 
        self.assertRaises(NoSuchFile,
202
 
                          transport.append_bytes, 'dir/path', 'content')
203
 
 
204
 
    def test_put_without_dir_fails(self):
205
 
        transport = MemoryTransport()
206
 
        self.assertRaises(NoSuchFile,
207
 
                          transport.put_file, 'dir/path', StringIO('content'))
208
 
 
209
 
    def test_get_missing(self):
210
 
        transport = MemoryTransport()
211
 
        self.assertRaises(NoSuchFile, transport.get, 'foo')
212
 
 
213
 
    def test_has_missing(self):
214
 
        transport = MemoryTransport()
215
 
        self.assertEquals(False, transport.has('foo'))
216
 
 
217
 
    def test_has_present(self):
218
 
        transport = MemoryTransport()
219
 
        transport.append_bytes('foo', 'content')
220
 
        self.assertEquals(True, transport.has('foo'))
221
 
 
222
 
    def test_mkdir(self):
223
 
        transport = MemoryTransport()
224
 
        transport.mkdir('dir')
225
 
        transport.append_bytes('dir/path', 'content')
226
 
        self.assertEqual(transport.get('dir/path').read(), 'content')
227
 
 
228
 
    def test_mkdir_missing_parent(self):
229
 
        transport = MemoryTransport()
230
 
        self.assertRaises(NoSuchFile,
231
 
                          transport.mkdir, 'dir/dir')
232
 
 
233
 
    def test_mkdir_twice(self):
234
 
        transport = MemoryTransport()
235
 
        transport.mkdir('dir')
236
 
        self.assertRaises(FileExists, transport.mkdir, 'dir')
237
 
 
238
 
    def test_parameters(self):
239
 
        transport = MemoryTransport()
240
 
        self.assertEqual(True, transport.listable())
241
 
        self.assertEqual(False, transport.should_cache())
242
 
        self.assertEqual(False, transport.is_readonly())
243
 
 
244
 
    def test_iter_files_recursive(self):
245
 
        transport = MemoryTransport()
246
 
        transport.mkdir('dir')
247
 
        transport.put_bytes('dir/foo', 'content')
248
 
        transport.put_bytes('dir/bar', 'content')
249
 
        transport.put_bytes('bar', 'content')
250
 
        paths = set(transport.iter_files_recursive())
251
 
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
252
 
 
253
 
    def test_stat(self):
254
 
        transport = MemoryTransport()
255
 
        transport.put_bytes('foo', 'content')
256
 
        transport.put_bytes('bar', 'phowar')
257
 
        self.assertEqual(7, transport.stat('foo').st_size)
258
 
        self.assertEqual(6, transport.stat('bar').st_size)
259
 
 
260
 
        
261
 
class ReadonlyDecoratorTransportTest(TestCase):
262
 
    """Readonly decoration specific tests."""
263
 
 
264
 
    def test_local_parameters(self):
265
 
        import bzrlib.transport.readonly as readonly
266
 
        # connect to . in readonly mode
267
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
268
 
        self.assertEqual(True, transport.listable())
269
 
        self.assertEqual(False, transport.should_cache())
270
 
        self.assertEqual(True, transport.is_readonly())
271
 
 
272
 
    def test_http_parameters(self):
273
 
        import bzrlib.transport.readonly as readonly
274
 
        from bzrlib.transport.http import HttpServer
275
 
        # connect to . via http which is not listable
276
 
        server = HttpServer()
277
 
        server.setUp()
278
 
        try:
279
 
            transport = get_transport('readonly+' + server.get_url())
280
 
            self.failUnless(isinstance(transport,
281
 
                                       readonly.ReadonlyTransportDecorator))
282
 
            self.assertEqual(False, transport.listable())
283
 
            self.assertEqual(True, transport.should_cache())
284
 
            self.assertEqual(True, transport.is_readonly())
285
 
        finally:
286
 
            server.tearDown()
287
 
 
288
 
 
289
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
290
 
    """NFS decorator specific tests."""
291
 
 
292
 
    def get_nfs_transport(self, url):
293
 
        import bzrlib.transport.fakenfs as fakenfs
294
 
        # connect to url with nfs decoration
295
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
296
 
 
297
 
    def test_local_parameters(self):
298
 
        # the listable, should_cache and is_readonly parameters
299
 
        # are not changed by the fakenfs decorator
300
 
        transport = self.get_nfs_transport('.')
301
 
        self.assertEqual(True, transport.listable())
302
 
        self.assertEqual(False, transport.should_cache())
303
 
        self.assertEqual(False, transport.is_readonly())
304
 
 
305
 
    def test_http_parameters(self):
306
 
        # the listable, should_cache and is_readonly parameters
307
 
        # are not changed by the fakenfs decorator
308
 
        from bzrlib.transport.http import HttpServer
309
 
        # connect to . via http which is not listable
310
 
        server = HttpServer()
311
 
        server.setUp()
312
 
        try:
313
 
            transport = self.get_nfs_transport(server.get_url())
314
 
            self.assertIsInstance(
315
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
316
 
            self.assertEqual(False, transport.listable())
317
 
            self.assertEqual(True, transport.should_cache())
318
 
            self.assertEqual(True, transport.is_readonly())
319
 
        finally:
320
 
            server.tearDown()
321
 
 
322
 
    def test_fakenfs_server_default(self):
323
 
        # a FakeNFSServer() should bring up a local relpath server for itself
324
 
        import bzrlib.transport.fakenfs as fakenfs
325
 
        server = fakenfs.FakeNFSServer()
326
 
        server.setUp()
327
 
        try:
328
 
            # the server should be a relpath localhost server
329
 
            self.assertEqual(server.get_url(), 'fakenfs+.')
330
 
            # and we should be able to get a transport for it
331
 
            transport = get_transport(server.get_url())
332
 
            # which must be a FakeNFSTransportDecorator instance.
333
 
            self.assertIsInstance(
334
 
                transport, fakenfs.FakeNFSTransportDecorator)
335
 
        finally:
336
 
            server.tearDown()
337
 
 
338
 
    def test_fakenfs_rename_semantics(self):
339
 
        # a FakeNFS transport must mangle the way rename errors occur to
340
 
        # look like NFS problems.
341
 
        transport = self.get_nfs_transport('.')
342
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
343
 
                        transport=transport)
344
 
        self.assertRaises(bzrlib.errors.ResourceBusy,
345
 
                          transport.rename, 'from', 'to')
346
 
 
347
 
 
348
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
349
 
    """Tests for simulation of VFAT restrictions"""
350
 
 
351
 
    def get_vfat_transport(self, url):
352
 
        """Return vfat-backed transport for test directory"""
353
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
354
 
        return FakeVFATTransportDecorator('vfat+' + url)
355
 
 
356
 
    def test_transport_creation(self):
357
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
358
 
        transport = self.get_vfat_transport('.')
359
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
360
 
 
361
 
    def test_transport_mkdir(self):
362
 
        transport = self.get_vfat_transport('.')
363
 
        transport.mkdir('HELLO')
364
 
        self.assertTrue(transport.has('hello'))
365
 
        self.assertTrue(transport.has('Hello'))
366
 
 
367
 
    def test_forbidden_chars(self):
368
 
        transport = self.get_vfat_transport('.')
369
 
        self.assertRaises(ValueError, transport.has, "<NU>")
370
 
 
371
 
 
372
 
class BadTransportHandler(Transport):
373
 
    def __init__(self, base_url):
374
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
375
 
 
376
 
 
377
 
class BackupTransportHandler(Transport):
378
 
    """Test transport that works as a backup for the BadTransportHandler"""
379
 
    pass
380
 
 
381
 
 
382
 
class TestTransportImplementation(TestCaseInTempDir):
383
 
    """Implementation verification for transports.
384
 
    
385
 
    To verify a transport we need a server factory, which is a callable
386
 
    that accepts no parameters and returns an implementation of
387
 
    bzrlib.transport.Server.
388
 
    
389
 
    That Server is then used to construct transport instances and test
390
 
    the transport via loopback activity.
391
 
 
392
 
    Currently this assumes that the Transport object is connected to the 
393
 
    current working directory.  So that whatever is done 
394
 
    through the transport, should show up in the working 
395
 
    directory, and vice-versa. This is a bug, because its possible to have
396
 
    URL schemes which provide access to something that may not be 
397
 
    result in storage on the local disk, i.e. due to file system limits, or 
398
 
    due to it being a database or some other non-filesystem tool.
 
18
from bzrlib.selftest import TestCaseInTempDir
 
19
 
 
20
def test_transport(tester, t, readonly=False):
 
21
    """Test a transport object. Basically, it assumes that the
 
22
    Transport object is connected to the current working directory.
 
23
    So that whatever is done through the transport, should show
 
24
    up in the working directory, and vice-versa.
399
25
 
400
26
    This also tests to make sure that the functions work with both
401
27
    generators and lists (assuming iter(list) is effectively a generator)
402
28
    """
403
 
    
404
 
    def setUp(self):
405
 
        super(TestTransportImplementation, self).setUp()
406
 
        self._server = self.transport_server()
407
 
        self._server.setUp()
408
 
 
409
 
    def tearDown(self):
410
 
        super(TestTransportImplementation, self).tearDown()
411
 
        self._server.tearDown()
412
 
        
413
 
    def get_transport(self):
414
 
        """Return a connected transport to the local directory."""
415
 
        base_url = self._server.get_url()
416
 
        # try getting the transport via the regular interface:
417
 
        t = get_transport(base_url)
418
 
        if not isinstance(t, self.transport_class): 
419
 
            # we did not get the correct transport class type. Override the
420
 
            # regular connection behaviour by direct construction.
421
 
            t = self.transport_class(base_url)
422
 
        return t
 
29
    import tempfile, os
 
30
    from bzrlib.transport.local import LocalTransport
 
31
 
 
32
    # Test has
 
33
    files = ['a', 'b', 'e', 'g']
 
34
    tester.build_tree(files)
 
35
    tester.assertEqual(t.has('a'), True)
 
36
    tester.assertEqual(t.has('c'), False)
 
37
    tester.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
38
            [True, True, False, False, True, False, True, False])
 
39
    tester.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
40
            [True, True, False, False, True, False, True, False])
 
41
 
 
42
    # Test get
 
43
    tester.assertEqual(t.get('a').read(), open('a').read())
 
44
    content_f = t.get_multi(files)
 
45
    for path,f in zip(files, content_f):
 
46
        tester.assertEqual(open(path).read(), f.read())
 
47
 
 
48
    content_f = t.get_multi(iter(files))
 
49
    for path,f in zip(files, content_f):
 
50
        tester.assertEqual(open(path).read(), f.read())
 
51
 
 
52
    tester.assertRaises(NoSuchFile, t.get, 'c')
 
53
    try:
 
54
        files = list(t.get_multi(['a', 'b', 'c']))
 
55
    except NoSuchFile:
 
56
        pass
 
57
    else:
 
58
        tester.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
59
    try:
 
60
        files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
 
61
    except NoSuchFile:
 
62
        pass
 
63
    else:
 
64
        tester.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
65
 
 
66
    # Test put
 
67
    if readonly:
 
68
        open('c', 'wb').write('some text for c\n')
 
69
    else:
 
70
        t.put('c', 'some text for c\n')
 
71
    tester.assert_(os.path.exists('c'))
 
72
    tester.assertEqual(open('c').read(), 'some text for c\n')
 
73
    tester.assertEqual(t.get('c').read(), 'some text for c\n')
 
74
    # Make sure 'has' is updated
 
75
    tester.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
76
            [True, True, True, False, True, False, True, False])
 
77
    if readonly:
 
78
        open('a', 'wb').write('new\ncontents for\na\n')
 
79
        open('d', 'wb').write('contents\nfor d\n')
 
80
    else:
 
81
        # Put also replaces contents
 
82
        tester.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
 
83
                                      ('d', 'contents\nfor d\n')]),
 
84
                         2)
 
85
    tester.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
86
            [True, True, True, True, True, False, True, False])
 
87
    tester.assertEqual(open('a').read(), 'new\ncontents for\na\n')
 
88
    tester.assertEqual(open('d').read(), 'contents\nfor d\n')
 
89
 
 
90
    if readonly:
 
91
        open('a', 'wb').write('diff\ncontents for\na\n')
 
92
        open('d', 'wb').write('another contents\nfor d\n')
 
93
    else:
 
94
        tester.assertEqual(
 
95
            t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
 
96
                              ('d', 'another contents\nfor d\n')]))
 
97
                         , 2)
 
98
    tester.assertEqual(open('a').read(), 'diff\ncontents for\na\n')
 
99
    tester.assertEqual(open('d').read(), 'another contents\nfor d\n')
 
100
 
 
101
    if not readonly:
 
102
        tester.assertRaises(NoSuchFile, t.put, 'path/doesnt/exist/c', 'contents')
 
103
 
 
104
    # Test mkdir
 
105
    os.mkdir('dir_a')
 
106
    tester.assertEqual(t.has('dir_a'), True)
 
107
    tester.assertEqual(t.has('dir_b'), False)
 
108
 
 
109
    if readonly:
 
110
        os.mkdir('dir_b')
 
111
    else:
 
112
        t.mkdir('dir_b')
 
113
    tester.assertEqual(t.has('dir_b'), True)
 
114
    tester.assert_(os.path.isdir('dir_b'))
 
115
 
 
116
    if readonly:
 
117
        os.mkdir('dir_c')
 
118
        os.mkdir('dir_d')
 
119
    else:
 
120
        t.mkdir_multi(['dir_c', 'dir_d'])
 
121
    tester.assertEqual(list(t.has_multi(['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_b'])),
 
122
            [True, True, True, True, False, True])
 
123
    for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d']:
 
124
        tester.assert_(os.path.isdir(d))
 
125
 
 
126
    if not readonly:
 
127
        tester.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
 
128
        tester.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
 
129
 
 
130
    # Make sure the transport recognizes when a
 
131
    # directory is created by other means
 
132
    # Caching Transports will fail, because dir_e was already seen not
 
133
    # to exist. So instead, we will search for a new directory
 
134
    #os.mkdir('dir_e')
 
135
    #if not readonly:
 
136
    #    tester.assertRaises(FileExists, t.mkdir, 'dir_e')
 
137
 
 
138
    os.mkdir('dir_f')
 
139
    if not readonly:
 
140
        tester.assertRaises(FileExists, t.mkdir, 'dir_f')
 
141
 
 
142
    # Test get/put in sub-directories
 
143
    if readonly:
 
144
        open('dir_a/a', 'wb').write('contents of dir_a/a')
 
145
        open('dir_b/b', 'wb').write('contents of dir_b/b')
 
146
    else:
 
147
        tester.assertEqual(
 
148
            t.put_multi([('dir_a/a', 'contents of dir_a/a'),
 
149
                         ('dir_b/b', 'contents of dir_b/b')])
 
150
                      , 2)
 
151
    for f in ('dir_a/a', 'dir_b/b'):
 
152
        tester.assertEqual(t.get(f).read(), open(f).read())
 
153
 
 
154
    # Test copy_to
 
155
    dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
156
    dtmp_base = os.path.basename(dtmp)
 
157
    local_t = LocalTransport(dtmp)
 
158
 
 
159
    files = ['a', 'b', 'c', 'd']
 
160
    t.copy_to(files, local_t)
 
161
    for f in files:
 
162
        tester.assertEquals(open(f).read(), open(os.path.join(dtmp_base, f)).read())
 
163
 
 
164
    # TODO: Test append
 
165
    # TODO: Make sure all entries support file-like objects as well as strings.
 
166
 
 
167
class LocalTransportTest(TestCaseInTempDir):
 
168
    def test_local_transport(self):
 
169
        from bzrlib.transport.local import LocalTransport
 
170
 
 
171
        t = LocalTransport('.')
 
172
        test_transport(self, t)
 
173
 
 
174
class HttpServer(object):
 
175
    """This just encapsulates spawning and stopping
 
176
    an httpserver.
 
177
    """
 
178
    def __init__(self):
 
179
        """This just spawns a separate process to serve files from
 
180
        this directory. Call the .stop() function to kill the
 
181
        process.
 
182
        """
 
183
        from BaseHTTPServer import HTTPServer
 
184
        from SimpleHTTPServer import SimpleHTTPRequestHandler
 
185
        import os
 
186
        if hasattr(os, 'fork'):
 
187
            self.pid = os.fork()
 
188
            if self.pid != 0:
 
189
                return
 
190
        else: # How do we handle windows, which doesn't have fork?
 
191
            raise NotImplementedError('At present HttpServer cannot fork on Windows')
 
192
 
 
193
            # We might be able to do something like os.spawn() for the
 
194
            # python executable, and give it a simple script to run.
 
195
            # but then how do we kill it?
 
196
 
 
197
        try:
 
198
            self.s = HTTPServer(('', 9999), SimpleHTTPRequestHandler)
 
199
            # TODO: Is there something nicer than killing the server when done?
 
200
            self.s.serve_forever()
 
201
        except KeyboardInterrupt:
 
202
            pass
 
203
        os._exit(0)
 
204
 
 
205
    def stop(self):
 
206
        import os
 
207
        if self.pid is None:
 
208
            return
 
209
        if hasattr(os, 'kill'):
 
210
            import signal
 
211
            os.kill(self.pid, signal.SIGINT)
 
212
            os.waitpid(self.pid, 0)
 
213
            self.pid = None
 
214
        else:
 
215
            raise NotImplementedError('At present HttpServer cannot stop on Windows')
 
216
 
 
217
class HttpTransportTest(TestCaseInTempDir):
 
218
    def test_http_transport(self):
 
219
        from bzrlib.transport.http import HttpTransport
 
220
 
 
221
        s = HttpServer()
 
222
 
 
223
        t = HttpTransport('http://localhost:9999/')
 
224
        test_transport(self, t, readonly=True)
 
225
 
 
226
        s.stop()
 
227