~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Refactor status display code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
2
 
#
 
1
# Copyright (C) 2004, 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
18
import os
19
 
import sys
20
 
import stat
21
19
from cStringIO import StringIO
22
20
 
23
 
import bzrlib
24
 
from bzrlib.errors import (NoSuchFile, FileExists,
25
 
                           TransportNotPossible,
26
 
                           ConnectionError,
27
 
                           DependencyNotPresent,
28
 
                           UnsupportedProtocol,
29
 
                           )
 
21
from bzrlib.errors import (NoSuchFile, FileExists, TransportNotPossible,
 
22
                           ConnectionError)
30
23
from bzrlib.tests import TestCase, TestCaseInTempDir
31
 
from bzrlib.transport import (_CoalescedOffset,
32
 
                              _get_protocol_handlers,
33
 
                              _get_transport_modules,
34
 
                              get_transport,
35
 
                              register_lazy_transport,
36
 
                              _set_protocol_handlers,
37
 
                              Transport,
38
 
                              )
39
 
from bzrlib.transport.memory import MemoryTransport
40
 
from bzrlib.transport.local import LocalTransport
41
 
 
 
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
25
from bzrlib.transport import memory, urlescape
 
26
 
 
27
 
 
28
def _append(fn, txt):
 
29
    """Append the given text (file-like object) to the supplied filename."""
 
30
    f = open(fn, 'ab')
 
31
    f.write(txt)
 
32
    f.flush()
 
33
    f.close()
 
34
    del f
42
35
 
43
36
class TestTransport(TestCase):
44
37
    """Test the non transport-concrete class functionality."""
45
38
 
46
 
    def test__get_set_protocol_handlers(self):
47
 
        handlers = _get_protocol_handlers()
48
 
        self.assertNotEqual({}, handlers)
49
 
        try:
50
 
            _set_protocol_handlers({})
51
 
            self.assertEqual({}, _get_protocol_handlers())
52
 
        finally:
53
 
            _set_protocol_handlers(handlers)
54
 
 
55
 
    def test_get_transport_modules(self):
56
 
        handlers = _get_protocol_handlers()
57
 
        class SampleHandler(object):
58
 
            """I exist, isnt that enough?"""
59
 
        try:
60
 
            my_handlers = {}
61
 
            _set_protocol_handlers(my_handlers)
62
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
63
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
64
 
            self.assertEqual([SampleHandler.__module__],
65
 
                             _get_transport_modules())
66
 
        finally:
67
 
            _set_protocol_handlers(handlers)
68
 
 
69
 
    def test_transport_dependency(self):
70
 
        """Transport with missing dependency causes no error"""
71
 
        saved_handlers = _get_protocol_handlers()
72
 
        try:
73
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
74
 
                    'BadTransportHandler')
75
 
            try:
76
 
                get_transport('foo://fooserver/foo')
77
 
            except UnsupportedProtocol, e:
78
 
                e_str = str(e)
79
 
                self.assertEquals('Unsupported protocol'
80
 
                                  ' for url "foo://fooserver/foo":'
81
 
                                  ' Unable to import library "some_lib":'
82
 
                                  ' testing missing dependency', str(e))
83
 
            else:
84
 
                self.fail('Did not raise UnsupportedProtocol')
85
 
        finally:
86
 
            # restore original values
87
 
            _set_protocol_handlers(saved_handlers)
88
 
            
89
 
    def test_transport_fallback(self):
90
 
        """Transport with missing dependency causes no error"""
91
 
        saved_handlers = _get_protocol_handlers()
92
 
        try:
93
 
            _set_protocol_handlers({})
94
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
95
 
                    'BackupTransportHandler')
96
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
97
 
                    'BadTransportHandler')
98
 
            t = get_transport('foo://fooserver/foo')
99
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
100
 
        finally:
101
 
            _set_protocol_handlers(saved_handlers)
102
 
 
103
 
 
104
 
class TestCoalesceOffsets(TestCase):
105
 
    
106
 
    def check(self, expected, offsets, limit=0, fudge=0):
107
 
        coalesce = Transport._coalesce_offsets
108
 
        exp = [_CoalescedOffset(*x) for x in expected]
109
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
110
 
        self.assertEqual(exp, out)
111
 
 
112
 
    def test_coalesce_empty(self):
113
 
        self.check([], [])
114
 
 
115
 
    def test_coalesce_simple(self):
116
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
117
 
 
118
 
    def test_coalesce_unrelated(self):
119
 
        self.check([(0, 10, [(0, 10)]),
120
 
                    (20, 10, [(0, 10)]),
121
 
                   ], [(0, 10), (20, 10)])
122
 
            
123
 
    def test_coalesce_unsorted(self):
124
 
        self.check([(20, 10, [(0, 10)]),
125
 
                    (0, 10, [(0, 10)]),
126
 
                   ], [(20, 10), (0, 10)])
127
 
 
128
 
    def test_coalesce_nearby(self):
129
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
130
 
                   [(0, 10), (10, 10)])
131
 
 
132
 
    def test_coalesce_overlapped(self):
133
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
134
 
                   [(0, 10), (5, 10)])
135
 
 
136
 
    def test_coalesce_limit(self):
137
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
138
 
                              (30, 10), (40, 10)]),
139
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
140
 
                              (30, 10), (40, 10)]),
141
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
142
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
143
 
                       (90, 10), (100, 10)],
144
 
                    limit=5)
145
 
 
146
 
    def test_coalesce_no_limit(self):
147
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
148
 
                               (30, 10), (40, 10), (50, 10),
149
 
                               (60, 10), (70, 10), (80, 10),
150
 
                               (90, 10)]),
151
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
152
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
153
 
                       (90, 10), (100, 10)])
154
 
 
155
 
    def test_coalesce_fudge(self):
156
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
157
 
                    (100, 10, [(0, 10),]),
158
 
                   ], [(10, 10), (30, 10), (100, 10)],
159
 
                   fudge=10
160
 
                  )
 
39
    def test_urlescape(self):
 
40
        self.assertEqual('%25', urlescape('%'))
 
41
 
 
42
 
 
43
class TestTransportMixIn(object):
 
44
    """Subclass this, and it will provide a series of tests for a Transport.
 
45
    It assumes that the Transport object is connected to the 
 
46
    current working directory.  So that whatever is done 
 
47
    through the transport, should show up in the working 
 
48
    directory, and vice-versa.
 
49
 
 
50
    This also tests to make sure that the functions work with both
 
51
    generators and lists (assuming iter(list) is effectively a generator)
 
52
    """
 
53
    readonly = False
 
54
    def get_transport(self):
 
55
        """Children should override this to return the Transport object.
 
56
        """
 
57
        raise NotImplementedError
 
58
 
 
59
    def test_has(self):
 
60
        t = self.get_transport()
 
61
 
 
62
        files = ['a', 'b', 'e', 'g', '%']
 
63
        self.build_tree(files)
 
64
        self.assertEqual(t.has('a'), True)
 
65
        self.assertEqual(t.has('c'), False)
 
66
        self.assertEqual(t.has(urlescape('%')), True)
 
67
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
68
                [True, True, False, False, True, False, True, False])
 
69
        self.assertEqual(t.has_any(['a', 'b', 'c']), True)
 
70
        self.assertEqual(t.has_any(['c', 'd', 'f', urlescape('%%')]), False)
 
71
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
72
                [True, True, False, False, True, False, True, False])
 
73
        self.assertEqual(t.has_any(['c', 'c', 'c']), False)
 
74
        self.assertEqual(t.has_any(['b', 'b', 'b']), True)
 
75
 
 
76
    def test_get(self):
 
77
        t = self.get_transport()
 
78
 
 
79
        files = ['a', 'b', 'e', 'g']
 
80
        self.build_tree(files)
 
81
        self.assertEqual(t.get('a').read(), open('a').read())
 
82
        content_f = t.get_multi(files)
 
83
        for path,f in zip(files, content_f):
 
84
            self.assertEqual(open(path).read(), f.read())
 
85
 
 
86
        content_f = t.get_multi(iter(files))
 
87
        for path,f in zip(files, content_f):
 
88
            self.assertEqual(open(path).read(), f.read())
 
89
 
 
90
        self.assertRaises(NoSuchFile, t.get, 'c')
 
91
        try:
 
92
            files = list(t.get_multi(['a', 'b', 'c']))
 
93
        except NoSuchFile:
 
94
            pass
 
95
        else:
 
96
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
97
        try:
 
98
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
 
99
        except NoSuchFile:
 
100
            pass
 
101
        else:
 
102
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
103
 
 
104
    def test_put(self):
 
105
        t = self.get_transport()
 
106
 
 
107
        if self.readonly:
 
108
            self.assertRaises(TransportNotPossible,
 
109
                    t.put, 'a', 'some text for a\n')
 
110
            open('a', 'wb').write('some text for a\n')
 
111
        else:
 
112
            t.put('a', 'some text for a\n')
 
113
        self.assert_(os.path.exists('a'))
 
114
        self.check_file_contents('a', 'some text for a\n')
 
115
        self.assertEqual(t.get('a').read(), 'some text for a\n')
 
116
        # Make sure 'has' is updated
 
117
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
118
                [True, False, False, False, False])
 
119
        if self.readonly:
 
120
            self.assertRaises(TransportNotPossible,
 
121
                    t.put_multi,
 
122
                    [('a', 'new\ncontents for\na\n'),
 
123
                        ('d', 'contents\nfor d\n')])
 
124
            open('a', 'wb').write('new\ncontents for\na\n')
 
125
            open('d', 'wb').write('contents\nfor d\n')
 
126
        else:
 
127
            # Put also replaces contents
 
128
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
 
129
                                          ('d', 'contents\nfor d\n')]),
 
130
                             2)
 
131
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
132
                [True, False, False, True, False])
 
133
        self.check_file_contents('a', 'new\ncontents for\na\n')
 
134
        self.check_file_contents('d', 'contents\nfor d\n')
 
135
 
 
136
        if self.readonly:
 
137
            self.assertRaises(TransportNotPossible,
 
138
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
 
139
                                  ('d', 'another contents\nfor d\n')]))
 
140
            open('a', 'wb').write('diff\ncontents for\na\n')
 
141
            open('d', 'wb').write('another contents\nfor d\n')
 
142
        else:
 
143
            self.assertEqual(
 
144
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
 
145
                                  ('d', 'another contents\nfor d\n')]))
 
146
                             , 2)
 
147
        self.check_file_contents('a', 'diff\ncontents for\na\n')
 
148
        self.check_file_contents('d', 'another contents\nfor d\n')
 
149
 
 
150
        if self.readonly:
 
151
            self.assertRaises(TransportNotPossible,
 
152
                    t.put, 'path/doesnt/exist/c', 'contents')
 
153
        else:
 
154
            self.assertRaises(NoSuchFile,
 
155
                    t.put, 'path/doesnt/exist/c', 'contents')
 
156
 
 
157
    def test_put_file(self):
 
158
        t = self.get_transport()
 
159
 
 
160
        # Test that StringIO can be used as a file-like object with put
 
161
        f1 = StringIO('this is a string\nand some more stuff\n')
 
162
        if self.readonly:
 
163
            open('f1', 'wb').write(f1.read())
 
164
        else:
 
165
            t.put('f1', f1)
 
166
 
 
167
        del f1
 
168
 
 
169
        self.check_file_contents('f1', 
 
170
                'this is a string\nand some more stuff\n')
 
171
 
 
172
        f2 = StringIO('here is some text\nand a bit more\n')
 
173
        f3 = StringIO('some text for the\nthird file created\n')
 
174
 
 
175
        if self.readonly:
 
176
            open('f2', 'wb').write(f2.read())
 
177
            open('f3', 'wb').write(f3.read())
 
178
        else:
 
179
            t.put_multi([('f2', f2), ('f3', f3)])
 
180
 
 
181
        del f2, f3
 
182
 
 
183
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
 
184
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
 
185
 
 
186
        # Test that an actual file object can be used with put
 
187
        f4 = open('f1', 'rb')
 
188
        if self.readonly:
 
189
            open('f4', 'wb').write(f4.read())
 
190
        else:
 
191
            t.put('f4', f4)
 
192
 
 
193
        del f4
 
194
 
 
195
        self.check_file_contents('f4', 
 
196
                'this is a string\nand some more stuff\n')
 
197
 
 
198
        f5 = open('f2', 'rb')
 
199
        f6 = open('f3', 'rb')
 
200
        if self.readonly:
 
201
            open('f5', 'wb').write(f5.read())
 
202
            open('f6', 'wb').write(f6.read())
 
203
        else:
 
204
            t.put_multi([('f5', f5), ('f6', f6)])
 
205
 
 
206
        del f5, f6
 
207
 
 
208
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
 
209
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
 
210
 
 
211
 
 
212
 
 
213
    def test_mkdir(self):
 
214
        t = self.get_transport()
 
215
 
 
216
        # Test mkdir
 
217
        os.mkdir('dir_a')
 
218
        self.assertEqual(t.has('dir_a'), True)
 
219
        self.assertEqual(t.has('dir_b'), False)
 
220
 
 
221
        if self.readonly:
 
222
            self.assertRaises(TransportNotPossible,
 
223
                    t.mkdir, 'dir_b')
 
224
            os.mkdir('dir_b')
 
225
        else:
 
226
            t.mkdir('dir_b')
 
227
        self.assertEqual(t.has('dir_b'), True)
 
228
        self.assert_(os.path.isdir('dir_b'))
 
229
 
 
230
        if self.readonly:
 
231
            self.assertRaises(TransportNotPossible,
 
232
                    t.mkdir_multi, ['dir_c', 'dir_d'])
 
233
            os.mkdir('dir_c')
 
234
            os.mkdir('dir_d')
 
235
        else:
 
236
            t.mkdir_multi(['dir_c', 'dir_d'])
 
237
 
 
238
        if self.readonly:
 
239
            self.assertRaises(TransportNotPossible,
 
240
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
 
241
            os.mkdir('dir_e')
 
242
            os.mkdir('dir_f')
 
243
        else:
 
244
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
245
        self.assertEqual(list(t.has_multi(
 
246
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
247
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
248
            [True, True, True, False,
 
249
             True, True, True, True])
 
250
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
 
251
            self.assert_(os.path.isdir(d))
 
252
 
 
253
        if not self.readonly:
 
254
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
 
255
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
 
256
 
 
257
        # Make sure the transport recognizes when a
 
258
        # directory is created by other means
 
259
        # Caching Transports will fail, because dir_e was already seen not
 
260
        # to exist. So instead, we will search for a new directory
 
261
        #os.mkdir('dir_e')
 
262
        #if not self.readonly:
 
263
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
 
264
 
 
265
        os.mkdir('dir_g')
 
266
        if not self.readonly:
 
267
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
268
 
 
269
        # Test get/put in sub-directories
 
270
        if self.readonly:
 
271
            open('dir_a/a', 'wb').write('contents of dir_a/a')
 
272
            open('dir_b/b', 'wb').write('contents of dir_b/b')
 
273
        else:
 
274
            self.assertEqual(
 
275
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
 
276
                             ('dir_b/b', 'contents of dir_b/b')])
 
277
                          , 2)
 
278
        for f in ('dir_a/a', 'dir_b/b'):
 
279
            self.assertEqual(t.get(f).read(), open(f).read())
 
280
 
 
281
    def test_copy_to(self):
 
282
        import tempfile
 
283
        from bzrlib.transport.local import LocalTransport
 
284
 
 
285
        t = self.get_transport()
 
286
 
 
287
        files = ['a', 'b', 'c', 'd']
 
288
        self.build_tree(files)
 
289
 
 
290
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
 
291
        dtmp_base = os.path.basename(dtmp)
 
292
        local_t = LocalTransport(dtmp)
 
293
 
 
294
        t.copy_to(files, local_t)
 
295
        for f in files:
 
296
            self.assertEquals(open(f).read(),
 
297
                    open(os.path.join(dtmp_base, f)).read())
 
298
 
 
299
        # Test that copying into a missing directory raises
 
300
        # NoSuchFile
 
301
        os.mkdir('e')
 
302
        open('e/f', 'wb').write('contents of e')
 
303
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], local_t)
 
304
 
 
305
        os.mkdir(os.path.join(dtmp_base, 'e'))
 
306
        t.copy_to(['e/f'], local_t)
 
307
 
 
308
        del dtmp, dtmp_base, local_t
 
309
 
 
310
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
 
311
        dtmp_base = os.path.basename(dtmp)
 
312
        local_t = LocalTransport(dtmp)
 
313
 
 
314
        files = ['a', 'b', 'c', 'd']
 
315
        t.copy_to(iter(files), local_t)
 
316
        for f in files:
 
317
            self.assertEquals(open(f).read(),
 
318
                    open(os.path.join(dtmp_base, f)).read())
 
319
 
 
320
        del dtmp, dtmp_base, local_t
 
321
 
 
322
    def test_append(self):
 
323
        t = self.get_transport()
 
324
 
 
325
        if self.readonly:
 
326
            open('a', 'wb').write('diff\ncontents for\na\n')
 
327
            open('b', 'wb').write('contents\nfor b\n')
 
328
        else:
 
329
            t.put_multi([
 
330
                    ('a', 'diff\ncontents for\na\n'),
 
331
                    ('b', 'contents\nfor b\n')
 
332
                    ])
 
333
 
 
334
        if self.readonly:
 
335
            self.assertRaises(TransportNotPossible,
 
336
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
 
337
            _append('a', 'add\nsome\nmore\ncontents\n')
 
338
        else:
 
339
            t.append('a', 'add\nsome\nmore\ncontents\n')
 
340
 
 
341
        self.check_file_contents('a', 
 
342
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
 
343
 
 
344
        if self.readonly:
 
345
            self.assertRaises(TransportNotPossible,
 
346
                    t.append_multi,
 
347
                        [('a', 'and\nthen\nsome\nmore\n'),
 
348
                         ('b', 'some\nmore\nfor\nb\n')])
 
349
            _append('a', 'and\nthen\nsome\nmore\n')
 
350
            _append('b', 'some\nmore\nfor\nb\n')
 
351
        else:
 
352
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
 
353
                    ('b', 'some\nmore\nfor\nb\n')])
 
354
        self.check_file_contents('a', 
 
355
            'diff\ncontents for\na\n'
 
356
            'add\nsome\nmore\ncontents\n'
 
357
            'and\nthen\nsome\nmore\n')
 
358
        self.check_file_contents('b', 
 
359
                'contents\nfor b\n'
 
360
                'some\nmore\nfor\nb\n')
 
361
 
 
362
        if self.readonly:
 
363
            _append('a', 'a little bit more\n')
 
364
            _append('b', 'from an iterator\n')
 
365
        else:
 
366
            t.append_multi(iter([('a', 'a little bit more\n'),
 
367
                    ('b', 'from an iterator\n')]))
 
368
        self.check_file_contents('a', 
 
369
            'diff\ncontents for\na\n'
 
370
            'add\nsome\nmore\ncontents\n'
 
371
            'and\nthen\nsome\nmore\n'
 
372
            'a little bit more\n')
 
373
        self.check_file_contents('b', 
 
374
                'contents\nfor b\n'
 
375
                'some\nmore\nfor\nb\n'
 
376
                'from an iterator\n')
 
377
 
 
378
    def test_append_file(self):
 
379
        t = self.get_transport()
 
380
 
 
381
        contents = [
 
382
            ('f1', 'this is a string\nand some more stuff\n'),
 
383
            ('f2', 'here is some text\nand a bit more\n'),
 
384
            ('f3', 'some text for the\nthird file created\n'),
 
385
            ('f4', 'this is a string\nand some more stuff\n'),
 
386
            ('f5', 'here is some text\nand a bit more\n'),
 
387
            ('f6', 'some text for the\nthird file created\n')
 
388
        ]
 
389
        
 
390
        if self.readonly:
 
391
            for f, val in contents:
 
392
                open(f, 'wb').write(val)
 
393
        else:
 
394
            t.put_multi(contents)
 
395
 
 
396
        a1 = StringIO('appending to\none\n')
 
397
        if self.readonly:
 
398
            _append('f1', a1.read())
 
399
        else:
 
400
            t.append('f1', a1)
 
401
 
 
402
        del a1
 
403
 
 
404
        self.check_file_contents('f1', 
 
405
                'this is a string\nand some more stuff\n'
 
406
                'appending to\none\n')
 
407
 
 
408
        a2 = StringIO('adding more\ntext to two\n')
 
409
        a3 = StringIO('some garbage\nto put in three\n')
 
410
 
 
411
        if self.readonly:
 
412
            _append('f2', a2.read())
 
413
            _append('f3', a3.read())
 
414
        else:
 
415
            t.append_multi([('f2', a2), ('f3', a3)])
 
416
 
 
417
        del a2, a3
 
418
 
 
419
        self.check_file_contents('f2',
 
420
                'here is some text\nand a bit more\n'
 
421
                'adding more\ntext to two\n')
 
422
        self.check_file_contents('f3', 
 
423
                'some text for the\nthird file created\n'
 
424
                'some garbage\nto put in three\n')
 
425
 
 
426
        # Test that an actual file object can be used with put
 
427
        a4 = open('f1', 'rb')
 
428
        if self.readonly:
 
429
            _append('f4', a4.read())
 
430
        else:
 
431
            t.append('f4', a4)
 
432
 
 
433
        del a4
 
434
 
 
435
        self.check_file_contents('f4', 
 
436
                'this is a string\nand some more stuff\n'
 
437
                'this is a string\nand some more stuff\n'
 
438
                'appending to\none\n')
 
439
 
 
440
        a5 = open('f2', 'rb')
 
441
        a6 = open('f3', 'rb')
 
442
        if self.readonly:
 
443
            _append('f5', a5.read())
 
444
            _append('f6', a6.read())
 
445
        else:
 
446
            t.append_multi([('f5', a5), ('f6', a6)])
 
447
 
 
448
        del a5, a6
 
449
 
 
450
        self.check_file_contents('f5',
 
451
                'here is some text\nand a bit more\n'
 
452
                'here is some text\nand a bit more\n'
 
453
                'adding more\ntext to two\n')
 
454
        self.check_file_contents('f6',
 
455
                'some text for the\nthird file created\n'
 
456
                'some text for the\nthird file created\n'
 
457
                'some garbage\nto put in three\n')
 
458
 
 
459
    def test_delete(self):
 
460
        # TODO: Test Transport.delete
 
461
        t = self.get_transport()
 
462
 
 
463
    def test_move(self):
 
464
        # TODO: Test Transport.move
 
465
        t = self.get_transport()
 
466
 
 
467
    def test_copy(self):
 
468
        # TODO: Test Transport.move
 
469
        t = self.get_transport()
 
470
 
 
471
    def test_connection_error(self):
 
472
        """ConnectionError is raised when connection is impossible"""
 
473
        if not hasattr(self, "get_bogus_transport"):
 
474
            return
 
475
        t = self.get_bogus_transport()
 
476
        try:
 
477
            t.get('.bzr/branch')
 
478
        except (ConnectionError, NoSuchFile), e:
 
479
            pass
 
480
        except (Exception), e:
 
481
            self.failIf(True, 'Wrong exception thrown: %s' % e)
 
482
        else:
 
483
            self.failIf(True, 'Did not get the expected exception.')
 
484
 
 
485
        
 
486
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
 
487
    def get_transport(self):
 
488
        from bzrlib.transport.local import LocalTransport
 
489
        return LocalTransport(u'.')
 
490
 
 
491
 
 
492
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
 
493
 
 
494
    readonly = True
 
495
 
 
496
    def get_transport(self):
 
497
        from bzrlib.transport.http import HttpTransport
 
498
        url = self.get_remote_url(u'.')
 
499
        return HttpTransport(url)
 
500
 
 
501
    def get_bogus_transport(self):
 
502
        from bzrlib.transport.http import HttpTransport
 
503
        return HttpTransport('http://jasldkjsalkdjalksjdkljasd')
161
504
 
162
505
 
163
506
class TestMemoryTransport(TestCase):
164
507
 
165
508
    def test_get_transport(self):
166
 
        MemoryTransport()
 
509
        memory.MemoryTransport()
167
510
 
168
511
    def test_clone(self):
169
 
        transport = MemoryTransport()
170
 
        self.assertTrue(isinstance(transport, MemoryTransport))
171
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
512
        transport = memory.MemoryTransport()
 
513
        self.failUnless(transport.clone() is transport)
172
514
 
173
515
    def test_abspath(self):
174
 
        transport = MemoryTransport()
175
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
176
 
 
177
 
    def test_abspath_of_root(self):
178
 
        transport = MemoryTransport()
179
 
        self.assertEqual("memory:///", transport.base)
180
 
        self.assertEqual("memory:///", transport.abspath('/'))
 
516
        transport = memory.MemoryTransport()
 
517
        self.assertEqual("in-memory:relpath", transport.abspath('relpath'))
181
518
 
182
519
    def test_relpath(self):
183
 
        transport = MemoryTransport()
 
520
        transport = memory.MemoryTransport()
184
521
 
185
522
    def test_append_and_get(self):
186
 
        transport = MemoryTransport()
187
 
        transport.append_bytes('path', 'content')
 
523
        transport = memory.MemoryTransport()
 
524
        transport.append('path', StringIO('content'))
188
525
        self.assertEqual(transport.get('path').read(), 'content')
189
 
        transport.append_file('path', StringIO('content'))
 
526
        transport.append('path', StringIO('content'))
190
527
        self.assertEqual(transport.get('path').read(), 'contentcontent')
191
528
 
192
529
    def test_put_and_get(self):
193
 
        transport = MemoryTransport()
194
 
        transport.put_file('path', StringIO('content'))
 
530
        transport = memory.MemoryTransport()
 
531
        transport.put('path', StringIO('content'))
195
532
        self.assertEqual(transport.get('path').read(), 'content')
196
 
        transport.put_bytes('path', 'content')
 
533
        transport.put('path', StringIO('content'))
197
534
        self.assertEqual(transport.get('path').read(), 'content')
198
535
 
199
536
    def test_append_without_dir_fails(self):
200
 
        transport = MemoryTransport()
 
537
        transport = memory.MemoryTransport()
201
538
        self.assertRaises(NoSuchFile,
202
 
                          transport.append_bytes, 'dir/path', 'content')
 
539
                          transport.append, 'dir/path', StringIO('content'))
203
540
 
204
541
    def test_put_without_dir_fails(self):
205
 
        transport = MemoryTransport()
 
542
        transport = memory.MemoryTransport()
206
543
        self.assertRaises(NoSuchFile,
207
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
544
                          transport.put, 'dir/path', StringIO('content'))
208
545
 
209
546
    def test_get_missing(self):
210
 
        transport = MemoryTransport()
 
547
        transport = memory.MemoryTransport()
211
548
        self.assertRaises(NoSuchFile, transport.get, 'foo')
212
549
 
213
550
    def test_has_missing(self):
214
 
        transport = MemoryTransport()
 
551
        transport = memory.MemoryTransport()
215
552
        self.assertEquals(False, transport.has('foo'))
216
553
 
217
554
    def test_has_present(self):
218
 
        transport = MemoryTransport()
219
 
        transport.append_bytes('foo', 'content')
 
555
        transport = memory.MemoryTransport()
 
556
        transport.append('foo', StringIO('content'))
220
557
        self.assertEquals(True, transport.has('foo'))
221
558
 
222
559
    def test_mkdir(self):
223
 
        transport = MemoryTransport()
 
560
        transport = memory.MemoryTransport()
224
561
        transport.mkdir('dir')
225
 
        transport.append_bytes('dir/path', 'content')
 
562
        transport.append('dir/path', StringIO('content'))
226
563
        self.assertEqual(transport.get('dir/path').read(), 'content')
227
564
 
228
565
    def test_mkdir_missing_parent(self):
229
 
        transport = MemoryTransport()
 
566
        transport = memory.MemoryTransport()
230
567
        self.assertRaises(NoSuchFile,
231
568
                          transport.mkdir, 'dir/dir')
232
569
 
233
570
    def test_mkdir_twice(self):
234
 
        transport = MemoryTransport()
 
571
        transport = memory.MemoryTransport()
235
572
        transport.mkdir('dir')
236
573
        self.assertRaises(FileExists, transport.mkdir, 'dir')
237
574
 
238
575
    def test_parameters(self):
239
 
        transport = MemoryTransport()
 
576
        transport = memory.MemoryTransport()
240
577
        self.assertEqual(True, transport.listable())
241
578
        self.assertEqual(False, transport.should_cache())
242
 
        self.assertEqual(False, transport.is_readonly())
243
579
 
244
580
    def test_iter_files_recursive(self):
245
 
        transport = MemoryTransport()
 
581
        transport = memory.MemoryTransport()
246
582
        transport.mkdir('dir')
247
 
        transport.put_bytes('dir/foo', 'content')
248
 
        transport.put_bytes('dir/bar', 'content')
249
 
        transport.put_bytes('bar', 'content')
 
583
        transport.put('dir/foo', StringIO('content'))
 
584
        transport.put('dir/bar', StringIO('content'))
 
585
        transport.put('bar', StringIO('content'))
250
586
        paths = set(transport.iter_files_recursive())
251
587
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
252
588
 
253
589
    def test_stat(self):
254
 
        transport = MemoryTransport()
255
 
        transport.put_bytes('foo', 'content')
256
 
        transport.put_bytes('bar', 'phowar')
 
590
        transport = memory.MemoryTransport()
 
591
        transport.put('foo', StringIO('content'))
 
592
        transport.put('bar', StringIO('phowar'))
257
593
        self.assertEqual(7, transport.stat('foo').st_size)
258
594
        self.assertEqual(6, transport.stat('bar').st_size)
259
595
 
260
 
        
261
 
class ReadonlyDecoratorTransportTest(TestCase):
262
 
    """Readonly decoration specific tests."""
263
 
 
264
 
    def test_local_parameters(self):
265
 
        import bzrlib.transport.readonly as readonly
266
 
        # connect to . in readonly mode
267
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
268
 
        self.assertEqual(True, transport.listable())
269
 
        self.assertEqual(False, transport.should_cache())
270
 
        self.assertEqual(True, transport.is_readonly())
271
 
 
272
 
    def test_http_parameters(self):
273
 
        import bzrlib.transport.readonly as readonly
274
 
        from bzrlib.transport.http import HttpServer
275
 
        # connect to . via http which is not listable
276
 
        server = HttpServer()
277
 
        server.setUp()
278
 
        try:
279
 
            transport = get_transport('readonly+' + server.get_url())
280
 
            self.failUnless(isinstance(transport,
281
 
                                       readonly.ReadonlyTransportDecorator))
282
 
            self.assertEqual(False, transport.listable())
283
 
            self.assertEqual(True, transport.should_cache())
284
 
            self.assertEqual(True, transport.is_readonly())
285
 
        finally:
286
 
            server.tearDown()
287
 
 
288
 
 
289
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
290
 
    """NFS decorator specific tests."""
291
 
 
292
 
    def get_nfs_transport(self, url):
293
 
        import bzrlib.transport.fakenfs as fakenfs
294
 
        # connect to url with nfs decoration
295
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
296
 
 
297
 
    def test_local_parameters(self):
298
 
        # the listable, should_cache and is_readonly parameters
299
 
        # are not changed by the fakenfs decorator
300
 
        transport = self.get_nfs_transport('.')
301
 
        self.assertEqual(True, transport.listable())
302
 
        self.assertEqual(False, transport.should_cache())
303
 
        self.assertEqual(False, transport.is_readonly())
304
 
 
305
 
    def test_http_parameters(self):
306
 
        # the listable, should_cache and is_readonly parameters
307
 
        # are not changed by the fakenfs decorator
308
 
        from bzrlib.transport.http import HttpServer
309
 
        # connect to . via http which is not listable
310
 
        server = HttpServer()
311
 
        server.setUp()
312
 
        try:
313
 
            transport = self.get_nfs_transport(server.get_url())
314
 
            self.assertIsInstance(
315
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
316
 
            self.assertEqual(False, transport.listable())
317
 
            self.assertEqual(True, transport.should_cache())
318
 
            self.assertEqual(True, transport.is_readonly())
319
 
        finally:
320
 
            server.tearDown()
321
 
 
322
 
    def test_fakenfs_server_default(self):
323
 
        # a FakeNFSServer() should bring up a local relpath server for itself
324
 
        import bzrlib.transport.fakenfs as fakenfs
325
 
        server = fakenfs.FakeNFSServer()
326
 
        server.setUp()
327
 
        try:
328
 
            # the server should be a relpath localhost server
329
 
            self.assertEqual(server.get_url(), 'fakenfs+.')
330
 
            # and we should be able to get a transport for it
331
 
            transport = get_transport(server.get_url())
332
 
            # which must be a FakeNFSTransportDecorator instance.
333
 
            self.assertIsInstance(
334
 
                transport, fakenfs.FakeNFSTransportDecorator)
335
 
        finally:
336
 
            server.tearDown()
337
 
 
338
 
    def test_fakenfs_rename_semantics(self):
339
 
        # a FakeNFS transport must mangle the way rename errors occur to
340
 
        # look like NFS problems.
341
 
        transport = self.get_nfs_transport('.')
342
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
343
 
                        transport=transport)
344
 
        self.assertRaises(bzrlib.errors.ResourceBusy,
345
 
                          transport.rename, 'from', 'to')
346
 
 
347
 
 
348
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
349
 
    """Tests for simulation of VFAT restrictions"""
350
 
 
351
 
    def get_vfat_transport(self, url):
352
 
        """Return vfat-backed transport for test directory"""
353
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
354
 
        return FakeVFATTransportDecorator('vfat+' + url)
355
 
 
356
 
    def test_transport_creation(self):
357
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
358
 
        transport = self.get_vfat_transport('.')
359
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
360
 
 
361
 
    def test_transport_mkdir(self):
362
 
        transport = self.get_vfat_transport('.')
363
 
        transport.mkdir('HELLO')
364
 
        self.assertTrue(transport.has('hello'))
365
 
        self.assertTrue(transport.has('Hello'))
366
 
 
367
 
    def test_forbidden_chars(self):
368
 
        transport = self.get_vfat_transport('.')
369
 
        self.assertRaises(ValueError, transport.has, "<NU>")
370
 
 
371
 
 
372
 
class BadTransportHandler(Transport):
373
 
    def __init__(self, base_url):
374
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
375
 
 
376
 
 
377
 
class BackupTransportHandler(Transport):
378
 
    """Test transport that works as a backup for the BadTransportHandler"""
379
 
    pass
380
 
 
381
 
 
382
 
class TestTransportImplementation(TestCaseInTempDir):
383
 
    """Implementation verification for transports.
384
 
    
385
 
    To verify a transport we need a server factory, which is a callable
386
 
    that accepts no parameters and returns an implementation of
387
 
    bzrlib.transport.Server.
388
 
    
389
 
    That Server is then used to construct transport instances and test
390
 
    the transport via loopback activity.
391
 
 
392
 
    Currently this assumes that the Transport object is connected to the 
393
 
    current working directory.  So that whatever is done 
394
 
    through the transport, should show up in the working 
395
 
    directory, and vice-versa. This is a bug, because its possible to have
396
 
    URL schemes which provide access to something that may not be 
397
 
    result in storage on the local disk, i.e. due to file system limits, or 
398
 
    due to it being a database or some other non-filesystem tool.
399
 
 
400
 
    This also tests to make sure that the functions work with both
401
 
    generators and lists (assuming iter(list) is effectively a generator)
402
 
    """
403
 
    
404
 
    def setUp(self):
405
 
        super(TestTransportImplementation, self).setUp()
406
 
        self._server = self.transport_server()
407
 
        self._server.setUp()
408
 
 
409
 
    def tearDown(self):
410
 
        super(TestTransportImplementation, self).tearDown()
411
 
        self._server.tearDown()
412
 
        
413
 
    def get_transport(self):
414
 
        """Return a connected transport to the local directory."""
415
 
        base_url = self._server.get_url()
416
 
        # try getting the transport via the regular interface:
417
 
        t = get_transport(base_url)
418
 
        if not isinstance(t, self.transport_class): 
419
 
            # we did not get the correct transport class type. Override the
420
 
            # regular connection behaviour by direct construction.
421
 
            t = self.transport_class(base_url)
422
 
        return t