~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

  • Committer: Robert Collins
  • Date: 2006-06-26 16:23:10 UTC
  • mfrom: (1780.2.1 misc-fixen)
  • mto: This revision was merged to the branch mainline in revision 1815.
  • Revision ID: robertc@robertcollins.net-20060626162310-98f5b55b8cc19d46
(robertc) Misc minor typos and the like.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005 by Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
 
18
18
import os
 
19
import sys
 
20
import stat
19
21
from cStringIO import StringIO
20
 
from bzrlib.selftest import TestCaseInTempDir
21
 
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
22
 
from bzrlib.errors import NoSuchFile, FileExists, TransportNotPossible
23
 
 
24
 
def _append(fn, txt):
25
 
    """Append the given text (file-like object) to the supplied filename."""
26
 
    f = open(fn, 'ab')
27
 
    f.write(txt)
28
 
    f.flush()
29
 
    f.close()
30
 
    del f
31
 
 
32
 
class TestTransportMixIn(object):
33
 
    """Subclass this, and it will provide a series of tests for a Transport.
34
 
    It assumes that the Transport object is connected to the 
35
 
    current working directory.  So that whatever is done 
36
 
    through the transport, should show up in the working 
37
 
    directory, and vice-versa.
38
 
 
39
 
    This also tests to make sure that the functions work with both
40
 
    generators and lists (assuming iter(list) is effectively a generator)
41
 
    """
42
 
    readonly = False
43
 
    def get_transport(self):
44
 
        """Children should override this to return the Transport object.
45
 
        """
46
 
        raise NotImplementedError
47
 
 
48
 
    def test_has(self):
49
 
        t = self.get_transport()
50
 
 
51
 
        files = ['a', 'b', 'e', 'g']
52
 
        self.build_tree(files)
53
 
        self.assertEqual(t.has('a'), True)
54
 
        self.assertEqual(t.has('c'), False)
55
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
56
 
                [True, True, False, False, True, False, True, False])
57
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
58
 
                [True, True, False, False, True, False, True, False])
59
 
 
60
 
    def test_get(self):
61
 
        t = self.get_transport()
62
 
 
63
 
        files = ['a', 'b', 'e', 'g']
64
 
        self.build_tree(files)
65
 
        self.assertEqual(t.get('a').read(), open('a').read())
66
 
        content_f = t.get_multi(files)
67
 
        for path,f in zip(files, content_f):
68
 
            self.assertEqual(open(path).read(), f.read())
69
 
 
70
 
        content_f = t.get_multi(iter(files))
71
 
        for path,f in zip(files, content_f):
72
 
            self.assertEqual(open(path).read(), f.read())
73
 
 
74
 
        self.assertRaises(NoSuchFile, t.get, 'c')
75
 
        try:
76
 
            files = list(t.get_multi(['a', 'b', 'c']))
77
 
        except NoSuchFile:
78
 
            pass
79
 
        else:
80
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
81
 
        try:
82
 
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
83
 
        except NoSuchFile:
84
 
            pass
85
 
        else:
86
 
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
87
 
 
88
 
    def test_put(self):
89
 
        t = self.get_transport()
90
 
 
91
 
        if self.readonly:
92
 
            self.assertRaises(TransportNotPossible,
93
 
                    t.put, 'a', 'some text for a\n')
94
 
            open('a', 'wb').write('some text for a\n')
95
 
        else:
96
 
            t.put('a', 'some text for a\n')
97
 
        self.assert_(os.path.exists('a'))
98
 
        self.check_file_contents('a', 'some text for a\n')
99
 
        self.assertEqual(t.get('a').read(), 'some text for a\n')
100
 
        # Make sure 'has' is updated
101
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
102
 
                [True, False, False, False, False])
103
 
        if self.readonly:
104
 
            self.assertRaises(TransportNotPossible,
105
 
                    t.put_multi,
106
 
                    [('a', 'new\ncontents for\na\n'),
107
 
                        ('d', 'contents\nfor d\n')])
108
 
            open('a', 'wb').write('new\ncontents for\na\n')
109
 
            open('d', 'wb').write('contents\nfor d\n')
110
 
        else:
111
 
            # Put also replaces contents
112
 
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
113
 
                                          ('d', 'contents\nfor d\n')]),
114
 
                             2)
115
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
116
 
                [True, False, False, True, False])
117
 
        self.check_file_contents('a', 'new\ncontents for\na\n')
118
 
        self.check_file_contents('d', 'contents\nfor d\n')
119
 
 
120
 
        if self.readonly:
121
 
            self.assertRaises(TransportNotPossible,
122
 
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
123
 
                                  ('d', 'another contents\nfor d\n')]))
124
 
            open('a', 'wb').write('diff\ncontents for\na\n')
125
 
            open('d', 'wb').write('another contents\nfor d\n')
126
 
        else:
127
 
            self.assertEqual(
128
 
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
129
 
                                  ('d', 'another contents\nfor d\n')]))
130
 
                             , 2)
131
 
        self.check_file_contents('a', 'diff\ncontents for\na\n')
132
 
        self.check_file_contents('d', 'another contents\nfor d\n')
133
 
 
134
 
        if self.readonly:
135
 
            self.assertRaises(TransportNotPossible,
136
 
                    t.put, 'path/doesnt/exist/c', 'contents')
137
 
        else:
138
 
            self.assertRaises(NoSuchFile,
139
 
                    t.put, 'path/doesnt/exist/c', 'contents')
140
 
 
141
 
    def test_put_file(self):
142
 
        t = self.get_transport()
143
 
 
144
 
        # Test that StringIO can be used as a file-like object with put
145
 
        f1 = StringIO('this is a string\nand some more stuff\n')
146
 
        if self.readonly:
147
 
            open('f1', 'wb').write(f1.read())
148
 
        else:
149
 
            t.put('f1', f1)
150
 
 
151
 
        del f1
152
 
 
153
 
        self.check_file_contents('f1', 
154
 
                'this is a string\nand some more stuff\n')
155
 
 
156
 
        f2 = StringIO('here is some text\nand a bit more\n')
157
 
        f3 = StringIO('some text for the\nthird file created\n')
158
 
 
159
 
        if self.readonly:
160
 
            open('f2', 'wb').write(f2.read())
161
 
            open('f3', 'wb').write(f3.read())
162
 
        else:
163
 
            t.put_multi([('f2', f2), ('f3', f3)])
164
 
 
165
 
        del f2, f3
166
 
 
167
 
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
168
 
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
169
 
 
170
 
        # Test that an actual file object can be used with put
171
 
        f4 = open('f1', 'rb')
172
 
        if self.readonly:
173
 
            open('f4', 'wb').write(f4.read())
174
 
        else:
175
 
            t.put('f4', f4)
176
 
 
177
 
        del f4
178
 
 
179
 
        self.check_file_contents('f4', 
180
 
                'this is a string\nand some more stuff\n')
181
 
 
182
 
        f5 = open('f2', 'rb')
183
 
        f6 = open('f3', 'rb')
184
 
        if self.readonly:
185
 
            open('f5', 'wb').write(f5.read())
186
 
            open('f6', 'wb').write(f6.read())
187
 
        else:
188
 
            t.put_multi([('f5', f5), ('f6', f6)])
189
 
 
190
 
        del f5, f6
191
 
 
192
 
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
193
 
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
194
 
 
195
 
 
 
22
 
 
23
import bzrlib
 
24
from bzrlib.errors import (NoSuchFile, FileExists,
 
25
                           TransportNotPossible,
 
26
                           ConnectionError,
 
27
                           DependencyNotPresent,
 
28
                           InvalidURL,
 
29
                           )
 
30
from bzrlib.tests import TestCase, TestCaseInTempDir
 
31
from bzrlib.transport import (_get_protocol_handlers,
 
32
                              _get_transport_modules,
 
33
                              get_transport,
 
34
                              register_lazy_transport,
 
35
                              _set_protocol_handlers,
 
36
                              Transport,
 
37
                              )
 
38
from bzrlib.transport.memory import MemoryTransport
 
39
from bzrlib.transport.local import LocalTransport
 
40
 
 
41
 
 
42
class TestTransport(TestCase):
 
43
    """Test the non transport-concrete class functionality."""
 
44
 
 
45
    def test__get_set_protocol_handlers(self):
 
46
        handlers = _get_protocol_handlers()
 
47
        self.assertNotEqual({}, handlers)
 
48
        try:
 
49
            _set_protocol_handlers({})
 
50
            self.assertEqual({}, _get_protocol_handlers())
 
51
        finally:
 
52
            _set_protocol_handlers(handlers)
 
53
 
 
54
    def test_get_transport_modules(self):
 
55
        handlers = _get_protocol_handlers()
 
56
        class SampleHandler(object):
 
57
            """I exist, isnt that enough?"""
 
58
        try:
 
59
            my_handlers = {}
 
60
            _set_protocol_handlers(my_handlers)
 
61
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
62
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
 
63
            self.assertEqual([SampleHandler.__module__],
 
64
                             _get_transport_modules())
 
65
        finally:
 
66
            _set_protocol_handlers(handlers)
 
67
 
 
68
    def test_transport_dependency(self):
 
69
        """Transport with missing dependency causes no error"""
 
70
        saved_handlers = _get_protocol_handlers()
 
71
        try:
 
72
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
73
                    'BadTransportHandler')
 
74
            # TODO: jam 20060427 Now we get InvalidURL because it looks like 
 
75
            #       a URL but we have no support for it.
 
76
            #       Is it better to always fall back to LocalTransport?
 
77
            #       I think this is a better error than a future NoSuchFile
 
78
            self.assertRaises(InvalidURL, get_transport, 'foo://fooserver/foo')
 
79
        finally:
 
80
            # restore original values
 
81
            _set_protocol_handlers(saved_handlers)
 
82
            
 
83
    def test_transport_fallback(self):
 
84
        """Transport with missing dependency causes no error"""
 
85
        saved_handlers = _get_protocol_handlers()
 
86
        try:
 
87
            _set_protocol_handlers({})
 
88
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
89
                    'BackupTransportHandler')
 
90
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
 
91
                    'BadTransportHandler')
 
92
            t = get_transport('foo://fooserver/foo')
 
93
            self.assertTrue(isinstance(t, BackupTransportHandler))
 
94
        finally:
 
95
            _set_protocol_handlers(saved_handlers)
 
96
            
 
97
 
 
98
class TestMemoryTransport(TestCase):
 
99
 
 
100
    def test_get_transport(self):
 
101
        MemoryTransport()
 
102
 
 
103
    def test_clone(self):
 
104
        transport = MemoryTransport()
 
105
        self.assertTrue(isinstance(transport, MemoryTransport))
 
106
 
 
107
    def test_abspath(self):
 
108
        transport = MemoryTransport()
 
109
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
 
110
 
 
111
    def test_relpath(self):
 
112
        transport = MemoryTransport()
 
113
 
 
114
    def test_append_and_get(self):
 
115
        transport = MemoryTransport()
 
116
        transport.append('path', StringIO('content'))
 
117
        self.assertEqual(transport.get('path').read(), 'content')
 
118
        transport.append('path', StringIO('content'))
 
119
        self.assertEqual(transport.get('path').read(), 'contentcontent')
 
120
 
 
121
    def test_put_and_get(self):
 
122
        transport = MemoryTransport()
 
123
        transport.put('path', StringIO('content'))
 
124
        self.assertEqual(transport.get('path').read(), 'content')
 
125
        transport.put('path', StringIO('content'))
 
126
        self.assertEqual(transport.get('path').read(), 'content')
 
127
 
 
128
    def test_append_without_dir_fails(self):
 
129
        transport = MemoryTransport()
 
130
        self.assertRaises(NoSuchFile,
 
131
                          transport.append, 'dir/path', StringIO('content'))
 
132
 
 
133
    def test_put_without_dir_fails(self):
 
134
        transport = MemoryTransport()
 
135
        self.assertRaises(NoSuchFile,
 
136
                          transport.put, 'dir/path', StringIO('content'))
 
137
 
 
138
    def test_get_missing(self):
 
139
        transport = MemoryTransport()
 
140
        self.assertRaises(NoSuchFile, transport.get, 'foo')
 
141
 
 
142
    def test_has_missing(self):
 
143
        transport = MemoryTransport()
 
144
        self.assertEquals(False, transport.has('foo'))
 
145
 
 
146
    def test_has_present(self):
 
147
        transport = MemoryTransport()
 
148
        transport.append('foo', StringIO('content'))
 
149
        self.assertEquals(True, transport.has('foo'))
196
150
 
197
151
    def test_mkdir(self):
198
 
        t = self.get_transport()
199
 
 
200
 
        # Test mkdir
201
 
        os.mkdir('dir_a')
202
 
        self.assertEqual(t.has('dir_a'), True)
203
 
        self.assertEqual(t.has('dir_b'), False)
204
 
 
205
 
        if self.readonly:
206
 
            self.assertRaises(TransportNotPossible,
207
 
                    t.mkdir, 'dir_b')
208
 
            os.mkdir('dir_b')
209
 
        else:
210
 
            t.mkdir('dir_b')
211
 
        self.assertEqual(t.has('dir_b'), True)
212
 
        self.assert_(os.path.isdir('dir_b'))
213
 
 
214
 
        if self.readonly:
215
 
            self.assertRaises(TransportNotPossible,
216
 
                    t.mkdir_multi, ['dir_c', 'dir_d'])
217
 
            os.mkdir('dir_c')
218
 
            os.mkdir('dir_d')
219
 
        else:
220
 
            t.mkdir_multi(['dir_c', 'dir_d'])
221
 
 
222
 
        if self.readonly:
223
 
            self.assertRaises(TransportNotPossible,
224
 
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
225
 
            os.mkdir('dir_e')
226
 
            os.mkdir('dir_f')
227
 
        else:
228
 
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
229
 
        self.assertEqual(list(t.has_multi(
230
 
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
231
 
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
232
 
            [True, True, True, False,
233
 
             True, True, True, True])
234
 
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
235
 
            self.assert_(os.path.isdir(d))
236
 
 
237
 
        if not self.readonly:
238
 
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
239
 
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
240
 
 
241
 
        # Make sure the transport recognizes when a
242
 
        # directory is created by other means
243
 
        # Caching Transports will fail, because dir_e was already seen not
244
 
        # to exist. So instead, we will search for a new directory
245
 
        #os.mkdir('dir_e')
246
 
        #if not self.readonly:
247
 
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
248
 
 
249
 
        os.mkdir('dir_g')
250
 
        if not self.readonly:
251
 
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
252
 
 
253
 
        # Test get/put in sub-directories
254
 
        if self.readonly:
255
 
            open('dir_a/a', 'wb').write('contents of dir_a/a')
256
 
            open('dir_b/b', 'wb').write('contents of dir_b/b')
257
 
        else:
258
 
            self.assertEqual(
259
 
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
260
 
                             ('dir_b/b', 'contents of dir_b/b')])
261
 
                          , 2)
262
 
        for f in ('dir_a/a', 'dir_b/b'):
263
 
            self.assertEqual(t.get(f).read(), open(f).read())
264
 
 
265
 
    def test_copy_to(self):
266
 
        import tempfile
267
 
        from bzrlib.transport.local import LocalTransport
268
 
 
269
 
        t = self.get_transport()
270
 
 
271
 
        files = ['a', 'b', 'c', 'd']
272
 
        self.build_tree(files)
273
 
 
274
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
275
 
        dtmp_base = os.path.basename(dtmp)
276
 
        local_t = LocalTransport(dtmp)
277
 
 
278
 
        t.copy_to(files, local_t)
279
 
        for f in files:
280
 
            self.assertEquals(open(f).read(),
281
 
                    open(os.path.join(dtmp_base, f)).read())
282
 
 
283
 
        del dtmp, dtmp_base, local_t
284
 
 
285
 
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
286
 
        dtmp_base = os.path.basename(dtmp)
287
 
        local_t = LocalTransport(dtmp)
288
 
 
289
 
        files = ['a', 'b', 'c', 'd']
290
 
        t.copy_to(iter(files), local_t)
291
 
        for f in files:
292
 
            self.assertEquals(open(f).read(),
293
 
                    open(os.path.join(dtmp_base, f)).read())
294
 
 
295
 
        del dtmp, dtmp_base, local_t
296
 
 
297
 
    def test_append(self):
298
 
        t = self.get_transport()
299
 
 
300
 
        if self.readonly:
301
 
            open('a', 'wb').write('diff\ncontents for\na\n')
302
 
            open('b', 'wb').write('contents\nfor b\n')
303
 
        else:
304
 
            t.put_multi([
305
 
                    ('a', 'diff\ncontents for\na\n'),
306
 
                    ('b', 'contents\nfor b\n')
307
 
                    ])
308
 
 
309
 
        if self.readonly:
310
 
            self.assertRaises(TransportNotPossible,
311
 
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
312
 
            _append('a', 'add\nsome\nmore\ncontents\n')
313
 
        else:
314
 
            t.append('a', 'add\nsome\nmore\ncontents\n')
315
 
 
316
 
        self.check_file_contents('a', 
317
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
318
 
 
319
 
        if self.readonly:
320
 
            self.assertRaises(TransportNotPossible,
321
 
                    t.append_multi,
322
 
                        [('a', 'and\nthen\nsome\nmore\n'),
323
 
                         ('b', 'some\nmore\nfor\nb\n')])
324
 
            _append('a', 'and\nthen\nsome\nmore\n')
325
 
            _append('b', 'some\nmore\nfor\nb\n')
326
 
        else:
327
 
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
328
 
                    ('b', 'some\nmore\nfor\nb\n')])
329
 
        self.check_file_contents('a', 
330
 
            'diff\ncontents for\na\n'
331
 
            'add\nsome\nmore\ncontents\n'
332
 
            'and\nthen\nsome\nmore\n')
333
 
        self.check_file_contents('b', 
334
 
                'contents\nfor b\n'
335
 
                'some\nmore\nfor\nb\n')
336
 
 
337
 
        if self.readonly:
338
 
            _append('a', 'a little bit more\n')
339
 
            _append('b', 'from an iterator\n')
340
 
        else:
341
 
            t.append_multi(iter([('a', 'a little bit more\n'),
342
 
                    ('b', 'from an iterator\n')]))
343
 
        self.check_file_contents('a', 
344
 
            'diff\ncontents for\na\n'
345
 
            'add\nsome\nmore\ncontents\n'
346
 
            'and\nthen\nsome\nmore\n'
347
 
            'a little bit more\n')
348
 
        self.check_file_contents('b', 
349
 
                'contents\nfor b\n'
350
 
                'some\nmore\nfor\nb\n'
351
 
                'from an iterator\n')
352
 
 
353
 
    def test_append_file(self):
354
 
        t = self.get_transport()
355
 
 
356
 
        contents = [
357
 
            ('f1', 'this is a string\nand some more stuff\n'),
358
 
            ('f2', 'here is some text\nand a bit more\n'),
359
 
            ('f3', 'some text for the\nthird file created\n'),
360
 
            ('f4', 'this is a string\nand some more stuff\n'),
361
 
            ('f5', 'here is some text\nand a bit more\n'),
362
 
            ('f6', 'some text for the\nthird file created\n')
363
 
        ]
 
152
        transport = MemoryTransport()
 
153
        transport.mkdir('dir')
 
154
        transport.append('dir/path', StringIO('content'))
 
155
        self.assertEqual(transport.get('dir/path').read(), 'content')
 
156
 
 
157
    def test_mkdir_missing_parent(self):
 
158
        transport = MemoryTransport()
 
159
        self.assertRaises(NoSuchFile,
 
160
                          transport.mkdir, 'dir/dir')
 
161
 
 
162
    def test_mkdir_twice(self):
 
163
        transport = MemoryTransport()
 
164
        transport.mkdir('dir')
 
165
        self.assertRaises(FileExists, transport.mkdir, 'dir')
 
166
 
 
167
    def test_parameters(self):
 
168
        transport = MemoryTransport()
 
169
        self.assertEqual(True, transport.listable())
 
170
        self.assertEqual(False, transport.should_cache())
 
171
        self.assertEqual(False, transport.is_readonly())
 
172
 
 
173
    def test_iter_files_recursive(self):
 
174
        transport = MemoryTransport()
 
175
        transport.mkdir('dir')
 
176
        transport.put('dir/foo', StringIO('content'))
 
177
        transport.put('dir/bar', StringIO('content'))
 
178
        transport.put('bar', StringIO('content'))
 
179
        paths = set(transport.iter_files_recursive())
 
180
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
 
181
 
 
182
    def test_stat(self):
 
183
        transport = MemoryTransport()
 
184
        transport.put('foo', StringIO('content'))
 
185
        transport.put('bar', StringIO('phowar'))
 
186
        self.assertEqual(7, transport.stat('foo').st_size)
 
187
        self.assertEqual(6, transport.stat('bar').st_size)
 
188
 
364
189
        
365
 
        if self.readonly:
366
 
            for f, val in contents:
367
 
                open(f, 'wb').write(val)
368
 
        else:
369
 
            t.put_multi(contents)
370
 
 
371
 
        a1 = StringIO('appending to\none\n')
372
 
        if self.readonly:
373
 
            _append('f1', a1.read())
374
 
        else:
375
 
            t.append('f1', a1)
376
 
 
377
 
        del a1
378
 
 
379
 
        self.check_file_contents('f1', 
380
 
                'this is a string\nand some more stuff\n'
381
 
                'appending to\none\n')
382
 
 
383
 
        a2 = StringIO('adding more\ntext to two\n')
384
 
        a3 = StringIO('some garbage\nto put in three\n')
385
 
 
386
 
        if self.readonly:
387
 
            _append('f2', a2.read())
388
 
            _append('f3', a3.read())
389
 
        else:
390
 
            t.append_multi([('f2', a2), ('f3', a3)])
391
 
 
392
 
        del a2, a3
393
 
 
394
 
        self.check_file_contents('f2',
395
 
                'here is some text\nand a bit more\n'
396
 
                'adding more\ntext to two\n')
397
 
        self.check_file_contents('f3', 
398
 
                'some text for the\nthird file created\n'
399
 
                'some garbage\nto put in three\n')
400
 
 
401
 
        # Test that an actual file object can be used with put
402
 
        a4 = open('f1', 'rb')
403
 
        if self.readonly:
404
 
            _append('f4', a4.read())
405
 
        else:
406
 
            t.append('f4', a4)
407
 
 
408
 
        del a4
409
 
 
410
 
        self.check_file_contents('f4', 
411
 
                'this is a string\nand some more stuff\n'
412
 
                'this is a string\nand some more stuff\n'
413
 
                'appending to\none\n')
414
 
 
415
 
        a5 = open('f2', 'rb')
416
 
        a6 = open('f3', 'rb')
417
 
        if self.readonly:
418
 
            _append('f5', a5.read())
419
 
            _append('f6', a6.read())
420
 
        else:
421
 
            t.append_multi([('f5', a5), ('f6', a6)])
422
 
 
423
 
        del a5, a6
424
 
 
425
 
        self.check_file_contents('f5',
426
 
                'here is some text\nand a bit more\n'
427
 
                'here is some text\nand a bit more\n'
428
 
                'adding more\ntext to two\n')
429
 
        self.check_file_contents('f6',
430
 
                'some text for the\nthird file created\n'
431
 
                'some text for the\nthird file created\n'
432
 
                'some garbage\nto put in three\n')
433
 
 
434
 
    def test_get_partial(self):
435
 
        t = self.get_transport()
436
 
 
437
 
        contents = [
438
 
            ('f1', 
439
 
                'here is some text\nand a bit more\n'
440
 
                'adding more\ntext to two\n'),
441
 
            ('f2',
442
 
                'this is a string\nand some more stuff\n'
443
 
                'appending to\none\n'),
444
 
            ('f3',
445
 
                'some text for the\nthird file created\n'
446
 
                'some garbage\nto put in three\n')
447
 
        ]
448
 
        if self.readonly:
449
 
            for f, val in contents:
450
 
                open(f, 'wb').write(val)
451
 
        else:
452
 
            t.put_multi(contents)
453
 
 
454
 
        self.assertRaises(NoSuchFile,
455
 
                t.get_partial, 'a-missing-file', 20)
456
 
        self.assertRaises(NoSuchFile,
457
 
                t.get_partial, 'another-missing-file', 20, 30)
458
 
        f = t.get_partial('f1', 33)
459
 
        self.assertEqual(f.read(), 
460
 
                'adding more\ntext to two\n')
461
 
        f = t.get_partial('f1', 33, 10)
462
 
        self.assertEqual(f.read(10), 
463
 
                'adding mor')
464
 
 
465
 
        del f
466
 
 
467
 
        offsets = [('f2', 37), ('f3', 20, 10), ('f1', 10, 20)]
468
 
        values = ['appending to\none\n',
469
 
                  'ird file c',
470
 
                  'me text\nand a bit mo'
471
 
                 ]
472
 
        contents_f = t.get_partial_multi(offsets)
473
 
        count = 0
474
 
        for f, val in zip(contents_f, values):
475
 
            count += 1
476
 
            self.assertEqual(val, f.read(len(val)))
477
 
        # Make sure we saw all values, and no extra
478
 
        self.assertEqual(len(values), count)
479
 
        self.assertEqual(list(contents_f), [])
480
 
 
481
 
        # Do the same thing with an iterator
482
 
        offsets = iter([('f2', 34), ('f3', 18, 10), ('f1', 15, 15)])
483
 
        values = ['ff\nappending to\none\n',
484
 
                  'third file',
485
 
                  'xt\nand a bit mo'
486
 
                 ]
487
 
        contents_f = t.get_partial_multi(offsets)
488
 
        count = 0
489
 
        for f, val in zip(contents_f, values):
490
 
            count += 1
491
 
            self.assertEqual(val, f.read(len(val)))
492
 
        self.assertEqual(len(values), count)
493
 
        self.assertEqual(list(contents_f), [])
494
 
 
495
 
 
496
 
    def test_delete(self):
497
 
        # TODO: Test Transport.delete
498
 
        pass
499
 
 
500
 
    def test_move(self):
501
 
        # TODO: Test Transport.move
502
 
        pass
503
 
 
504
 
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
505
 
    def get_transport(self):
506
 
        from bzrlib.transport.local import LocalTransport
507
 
        return LocalTransport('.')
508
 
 
509
 
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
510
 
    readonly = True
511
 
    def get_transport(self):
512
 
        from bzrlib.transport.http import HttpTransport
513
 
        url = self.get_remote_url('.')
514
 
        return HttpTransport(url)
515
 
 
 
190
class ReadonlyDecoratorTransportTest(TestCase):
 
191
    """Readonly decoration specific tests."""
 
192
 
 
193
    def test_local_parameters(self):
 
194
        import bzrlib.transport.readonly as readonly
 
195
        # connect to . in readonly mode
 
196
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
 
197
        self.assertEqual(True, transport.listable())
 
198
        self.assertEqual(False, transport.should_cache())
 
199
        self.assertEqual(True, transport.is_readonly())
 
200
 
 
201
    def test_http_parameters(self):
 
202
        import bzrlib.transport.readonly as readonly
 
203
        from bzrlib.transport.http import HttpServer
 
204
        # connect to . via http which is not listable
 
205
        server = HttpServer()
 
206
        server.setUp()
 
207
        try:
 
208
            transport = get_transport('readonly+' + server.get_url())
 
209
            self.failUnless(isinstance(transport,
 
210
                                       readonly.ReadonlyTransportDecorator))
 
211
            self.assertEqual(False, transport.listable())
 
212
            self.assertEqual(True, transport.should_cache())
 
213
            self.assertEqual(True, transport.is_readonly())
 
214
        finally:
 
215
            server.tearDown()
 
216
 
 
217
 
 
218
class FakeNFSDecoratorTests(TestCaseInTempDir):
 
219
    """NFS decorator specific tests."""
 
220
 
 
221
    def get_nfs_transport(self, url):
 
222
        import bzrlib.transport.fakenfs as fakenfs
 
223
        # connect to url with nfs decoration
 
224
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
 
225
 
 
226
    def test_local_parameters(self):
 
227
        # the listable, should_cache and is_readonly parameters
 
228
        # are not changed by the fakenfs decorator
 
229
        transport = self.get_nfs_transport('.')
 
230
        self.assertEqual(True, transport.listable())
 
231
        self.assertEqual(False, transport.should_cache())
 
232
        self.assertEqual(False, transport.is_readonly())
 
233
 
 
234
    def test_http_parameters(self):
 
235
        # the listable, should_cache and is_readonly parameters
 
236
        # are not changed by the fakenfs decorator
 
237
        from bzrlib.transport.http import HttpServer
 
238
        # connect to . via http which is not listable
 
239
        server = HttpServer()
 
240
        server.setUp()
 
241
        try:
 
242
            transport = self.get_nfs_transport(server.get_url())
 
243
            self.assertIsInstance(
 
244
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
 
245
            self.assertEqual(False, transport.listable())
 
246
            self.assertEqual(True, transport.should_cache())
 
247
            self.assertEqual(True, transport.is_readonly())
 
248
        finally:
 
249
            server.tearDown()
 
250
 
 
251
    def test_fakenfs_server_default(self):
 
252
        # a FakeNFSServer() should bring up a local relpath server for itself
 
253
        import bzrlib.transport.fakenfs as fakenfs
 
254
        server = fakenfs.FakeNFSServer()
 
255
        server.setUp()
 
256
        try:
 
257
            # the server should be a relpath localhost server
 
258
            self.assertEqual(server.get_url(), 'fakenfs+.')
 
259
            # and we should be able to get a transport for it
 
260
            transport = get_transport(server.get_url())
 
261
            # which must be a FakeNFSTransportDecorator instance.
 
262
            self.assertIsInstance(
 
263
                transport, fakenfs.FakeNFSTransportDecorator)
 
264
        finally:
 
265
            server.tearDown()
 
266
 
 
267
    def test_fakenfs_rename_semantics(self):
 
268
        # a FakeNFS transport must mangle the way rename errors occur to
 
269
        # look like NFS problems.
 
270
        transport = self.get_nfs_transport('.')
 
271
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
 
272
                        transport=transport)
 
273
        self.assertRaises(bzrlib.errors.ResourceBusy,
 
274
                          transport.rename, 'from', 'to')
 
275
 
 
276
 
 
277
class FakeVFATDecoratorTests(TestCaseInTempDir):
 
278
    """Tests for simulation of VFAT restrictions"""
 
279
 
 
280
    def get_vfat_transport(self, url):
 
281
        """Return vfat-backed transport for test directory"""
 
282
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
283
        return FakeVFATTransportDecorator('vfat+' + url)
 
284
 
 
285
    def test_transport_creation(self):
 
286
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
 
287
        transport = self.get_vfat_transport('.')
 
288
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
 
289
 
 
290
    def test_transport_mkdir(self):
 
291
        transport = self.get_vfat_transport('.')
 
292
        transport.mkdir('HELLO')
 
293
        self.assertTrue(transport.has('hello'))
 
294
        self.assertTrue(transport.has('Hello'))
 
295
 
 
296
    def test_forbidden_chars(self):
 
297
        transport = self.get_vfat_transport('.')
 
298
        self.assertRaises(ValueError, transport.has, "<NU>")
 
299
 
 
300
 
 
301
class BadTransportHandler(Transport):
 
302
    def __init__(self, base_url):
 
303
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
 
304
 
 
305
 
 
306
class BackupTransportHandler(Transport):
 
307
    """Test transport that works as a backup for the BadTransportHandler"""
 
308
    pass