~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testtransport.py

  • Committer: Robert Collins
  • Date: 2005-10-18 13:11:57 UTC
  • mfrom: (1185.16.72) (0.2.1)
  • Revision ID: robertc@robertcollins.net-20051018131157-76a9970aa78e927e
Merged Martin.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2004, 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
18
import os
19
 
import sys
20
 
import stat
21
19
from cStringIO import StringIO
22
20
 
23
 
import bzrlib
24
 
from bzrlib import urlutils
25
 
from bzrlib.errors import (NoSuchFile, FileExists,
26
 
                           TransportNotPossible,
27
 
                           ConnectionError,
28
 
                           DependencyNotPresent,
29
 
                           UnsupportedProtocol,
30
 
                           PathNotChild,
31
 
                           )
32
 
from bzrlib.tests import TestCase, TestCaseInTempDir
33
 
from bzrlib.transport import (_CoalescedOffset,
34
 
                              _get_protocol_handlers,
35
 
                              _get_transport_modules,
36
 
                              get_transport,
37
 
                              register_lazy_transport,
38
 
                              _set_protocol_handlers,
39
 
                              Transport,
40
 
                              )
41
 
from bzrlib.transport.memory import MemoryTransport
42
 
from bzrlib.transport.local import LocalTransport
43
 
 
44
 
 
45
 
# TODO: Should possibly split transport-specific tests into their own files.
46
 
 
47
 
 
48
 
class TestTransport(TestCase):
49
 
    """Test the non transport-concrete class functionality."""
50
 
 
51
 
    def test__get_set_protocol_handlers(self):
52
 
        handlers = _get_protocol_handlers()
53
 
        self.assertNotEqual({}, handlers)
54
 
        try:
55
 
            _set_protocol_handlers({})
56
 
            self.assertEqual({}, _get_protocol_handlers())
57
 
        finally:
58
 
            _set_protocol_handlers(handlers)
59
 
 
60
 
    def test_get_transport_modules(self):
61
 
        handlers = _get_protocol_handlers()
62
 
        class SampleHandler(object):
63
 
            """I exist, isnt that enough?"""
64
 
        try:
65
 
            my_handlers = {}
66
 
            _set_protocol_handlers(my_handlers)
67
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
68
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
69
 
            self.assertEqual([SampleHandler.__module__],
70
 
                             _get_transport_modules())
71
 
        finally:
72
 
            _set_protocol_handlers(handlers)
73
 
 
74
 
    def test_transport_dependency(self):
75
 
        """Transport with missing dependency causes no error"""
76
 
        saved_handlers = _get_protocol_handlers()
77
 
        try:
78
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
79
 
                    'BadTransportHandler')
80
 
            try:
81
 
                get_transport('foo://fooserver/foo')
82
 
            except UnsupportedProtocol, e:
83
 
                e_str = str(e)
84
 
                self.assertEquals('Unsupported protocol'
85
 
                                  ' for url "foo://fooserver/foo":'
86
 
                                  ' Unable to import library "some_lib":'
87
 
                                  ' testing missing dependency', str(e))
88
 
            else:
89
 
                self.fail('Did not raise UnsupportedProtocol')
90
 
        finally:
91
 
            # restore original values
92
 
            _set_protocol_handlers(saved_handlers)
93
 
            
94
 
    def test_transport_fallback(self):
95
 
        """Transport with missing dependency causes no error"""
96
 
        saved_handlers = _get_protocol_handlers()
97
 
        try:
98
 
            _set_protocol_handlers({})
99
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
100
 
                    'BackupTransportHandler')
101
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
102
 
                    'BadTransportHandler')
103
 
            t = get_transport('foo://fooserver/foo')
104
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
105
 
        finally:
106
 
            _set_protocol_handlers(saved_handlers)
107
 
 
108
 
    def test__combine_paths(self):
109
 
        t = Transport('/')
110
 
        self.assertEqual('/home/sarah/project/foo',
111
 
                         t._combine_paths('/home/sarah', 'project/foo'))
112
 
        self.assertEqual('/etc',
113
 
                         t._combine_paths('/home/sarah', '../../etc'))
114
 
        self.assertEqual('/etc',
115
 
                         t._combine_paths('/home/sarah', '../../../etc'))
116
 
        self.assertEqual('/etc',
117
 
                         t._combine_paths('/home/sarah', '/etc'))
118
 
 
119
 
 
120
 
class TestCoalesceOffsets(TestCase):
121
 
    
122
 
    def check(self, expected, offsets, limit=0, fudge=0):
123
 
        coalesce = Transport._coalesce_offsets
124
 
        exp = [_CoalescedOffset(*x) for x in expected]
125
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
126
 
        self.assertEqual(exp, out)
127
 
 
128
 
    def test_coalesce_empty(self):
129
 
        self.check([], [])
130
 
 
131
 
    def test_coalesce_simple(self):
132
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
133
 
 
134
 
    def test_coalesce_unrelated(self):
135
 
        self.check([(0, 10, [(0, 10)]),
136
 
                    (20, 10, [(0, 10)]),
137
 
                   ], [(0, 10), (20, 10)])
138
 
            
139
 
    def test_coalesce_unsorted(self):
140
 
        self.check([(20, 10, [(0, 10)]),
141
 
                    (0, 10, [(0, 10)]),
142
 
                   ], [(20, 10), (0, 10)])
143
 
 
144
 
    def test_coalesce_nearby(self):
145
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
146
 
                   [(0, 10), (10, 10)])
147
 
 
148
 
    def test_coalesce_overlapped(self):
149
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
150
 
                   [(0, 10), (5, 10)])
151
 
 
152
 
    def test_coalesce_limit(self):
153
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
154
 
                              (30, 10), (40, 10)]),
155
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
156
 
                              (30, 10), (40, 10)]),
157
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
158
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
159
 
                       (90, 10), (100, 10)],
160
 
                    limit=5)
161
 
 
162
 
    def test_coalesce_no_limit(self):
163
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
164
 
                               (30, 10), (40, 10), (50, 10),
165
 
                               (60, 10), (70, 10), (80, 10),
166
 
                               (90, 10)]),
167
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
168
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
169
 
                       (90, 10), (100, 10)])
170
 
 
171
 
    def test_coalesce_fudge(self):
172
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
173
 
                    (100, 10, [(0, 10),]),
174
 
                   ], [(10, 10), (30, 10), (100, 10)],
175
 
                   fudge=10
176
 
                  )
 
21
from bzrlib.errors import NoSuchFile, FileExists, TransportNotPossible
 
22
from bzrlib.selftest import TestCase, TestCaseInTempDir
 
23
from bzrlib.selftest.HTTPTestUtil import TestCaseWithWebserver
 
24
from bzrlib.transport import memory
 
25
 
 
26
 
 
27
def _append(fn, txt):
 
28
    """Append the given text (file-like object) to the supplied filename."""
 
29
    f = open(fn, 'ab')
 
30
    f.write(txt)
 
31
    f.flush()
 
32
    f.close()
 
33
    del f
 
34
 
 
35
class TestTransportMixIn(object):
 
36
    """Subclass this, and it will provide a series of tests for a Transport.
 
37
    It assumes that the Transport object is connected to the 
 
38
    current working directory.  So that whatever is done 
 
39
    through the transport, should show up in the working 
 
40
    directory, and vice-versa.
 
41
 
 
42
    This also tests to make sure that the functions work with both
 
43
    generators and lists (assuming iter(list) is effectively a generator)
 
44
    """
 
45
    readonly = False
 
46
    def get_transport(self):
 
47
        """Children should override this to return the Transport object.
 
48
        """
 
49
        raise NotImplementedError
 
50
 
 
51
    def test_has(self):
 
52
        t = self.get_transport()
 
53
 
 
54
        files = ['a', 'b', 'e', 'g']
 
55
        self.build_tree(files)
 
56
        self.assertEqual(t.has('a'), True)
 
57
        self.assertEqual(t.has('c'), False)
 
58
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
59
                [True, True, False, False, True, False, True, False])
 
60
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
61
                [True, True, False, False, True, False, True, False])
 
62
 
 
63
    def test_get(self):
 
64
        t = self.get_transport()
 
65
 
 
66
        files = ['a', 'b', 'e', 'g']
 
67
        self.build_tree(files)
 
68
        self.assertEqual(t.get('a').read(), open('a').read())
 
69
        content_f = t.get_multi(files)
 
70
        for path,f in zip(files, content_f):
 
71
            self.assertEqual(open(path).read(), f.read())
 
72
 
 
73
        content_f = t.get_multi(iter(files))
 
74
        for path,f in zip(files, content_f):
 
75
            self.assertEqual(open(path).read(), f.read())
 
76
 
 
77
        self.assertRaises(NoSuchFile, t.get, 'c')
 
78
        try:
 
79
            files = list(t.get_multi(['a', 'b', 'c']))
 
80
        except NoSuchFile:
 
81
            pass
 
82
        else:
 
83
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
84
        try:
 
85
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
 
86
        except NoSuchFile:
 
87
            pass
 
88
        else:
 
89
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
90
 
 
91
    def test_put(self):
 
92
        t = self.get_transport()
 
93
 
 
94
        if self.readonly:
 
95
            self.assertRaises(TransportNotPossible,
 
96
                    t.put, 'a', 'some text for a\n')
 
97
            open('a', 'wb').write('some text for a\n')
 
98
        else:
 
99
            t.put('a', 'some text for a\n')
 
100
        self.assert_(os.path.exists('a'))
 
101
        self.check_file_contents('a', 'some text for a\n')
 
102
        self.assertEqual(t.get('a').read(), 'some text for a\n')
 
103
        # Make sure 'has' is updated
 
104
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
105
                [True, False, False, False, False])
 
106
        if self.readonly:
 
107
            self.assertRaises(TransportNotPossible,
 
108
                    t.put_multi,
 
109
                    [('a', 'new\ncontents for\na\n'),
 
110
                        ('d', 'contents\nfor d\n')])
 
111
            open('a', 'wb').write('new\ncontents for\na\n')
 
112
            open('d', 'wb').write('contents\nfor d\n')
 
113
        else:
 
114
            # Put also replaces contents
 
115
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
 
116
                                          ('d', 'contents\nfor d\n')]),
 
117
                             2)
 
118
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
119
                [True, False, False, True, False])
 
120
        self.check_file_contents('a', 'new\ncontents for\na\n')
 
121
        self.check_file_contents('d', 'contents\nfor d\n')
 
122
 
 
123
        if self.readonly:
 
124
            self.assertRaises(TransportNotPossible,
 
125
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
 
126
                                  ('d', 'another contents\nfor d\n')]))
 
127
            open('a', 'wb').write('diff\ncontents for\na\n')
 
128
            open('d', 'wb').write('another contents\nfor d\n')
 
129
        else:
 
130
            self.assertEqual(
 
131
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
 
132
                                  ('d', 'another contents\nfor d\n')]))
 
133
                             , 2)
 
134
        self.check_file_contents('a', 'diff\ncontents for\na\n')
 
135
        self.check_file_contents('d', 'another contents\nfor d\n')
 
136
 
 
137
        if self.readonly:
 
138
            self.assertRaises(TransportNotPossible,
 
139
                    t.put, 'path/doesnt/exist/c', 'contents')
 
140
        else:
 
141
            self.assertRaises(NoSuchFile,
 
142
                    t.put, 'path/doesnt/exist/c', 'contents')
 
143
 
 
144
    def test_put_file(self):
 
145
        t = self.get_transport()
 
146
 
 
147
        # Test that StringIO can be used as a file-like object with put
 
148
        f1 = StringIO('this is a string\nand some more stuff\n')
 
149
        if self.readonly:
 
150
            open('f1', 'wb').write(f1.read())
 
151
        else:
 
152
            t.put('f1', f1)
 
153
 
 
154
        del f1
 
155
 
 
156
        self.check_file_contents('f1', 
 
157
                'this is a string\nand some more stuff\n')
 
158
 
 
159
        f2 = StringIO('here is some text\nand a bit more\n')
 
160
        f3 = StringIO('some text for the\nthird file created\n')
 
161
 
 
162
        if self.readonly:
 
163
            open('f2', 'wb').write(f2.read())
 
164
            open('f3', 'wb').write(f3.read())
 
165
        else:
 
166
            t.put_multi([('f2', f2), ('f3', f3)])
 
167
 
 
168
        del f2, f3
 
169
 
 
170
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
 
171
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
 
172
 
 
173
        # Test that an actual file object can be used with put
 
174
        f4 = open('f1', 'rb')
 
175
        if self.readonly:
 
176
            open('f4', 'wb').write(f4.read())
 
177
        else:
 
178
            t.put('f4', f4)
 
179
 
 
180
        del f4
 
181
 
 
182
        self.check_file_contents('f4', 
 
183
                'this is a string\nand some more stuff\n')
 
184
 
 
185
        f5 = open('f2', 'rb')
 
186
        f6 = open('f3', 'rb')
 
187
        if self.readonly:
 
188
            open('f5', 'wb').write(f5.read())
 
189
            open('f6', 'wb').write(f6.read())
 
190
        else:
 
191
            t.put_multi([('f5', f5), ('f6', f6)])
 
192
 
 
193
        del f5, f6
 
194
 
 
195
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
 
196
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
 
197
 
 
198
 
 
199
 
 
200
    def test_mkdir(self):
 
201
        t = self.get_transport()
 
202
 
 
203
        # Test mkdir
 
204
        os.mkdir('dir_a')
 
205
        self.assertEqual(t.has('dir_a'), True)
 
206
        self.assertEqual(t.has('dir_b'), False)
 
207
 
 
208
        if self.readonly:
 
209
            self.assertRaises(TransportNotPossible,
 
210
                    t.mkdir, 'dir_b')
 
211
            os.mkdir('dir_b')
 
212
        else:
 
213
            t.mkdir('dir_b')
 
214
        self.assertEqual(t.has('dir_b'), True)
 
215
        self.assert_(os.path.isdir('dir_b'))
 
216
 
 
217
        if self.readonly:
 
218
            self.assertRaises(TransportNotPossible,
 
219
                    t.mkdir_multi, ['dir_c', 'dir_d'])
 
220
            os.mkdir('dir_c')
 
221
            os.mkdir('dir_d')
 
222
        else:
 
223
            t.mkdir_multi(['dir_c', 'dir_d'])
 
224
 
 
225
        if self.readonly:
 
226
            self.assertRaises(TransportNotPossible,
 
227
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
 
228
            os.mkdir('dir_e')
 
229
            os.mkdir('dir_f')
 
230
        else:
 
231
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
232
        self.assertEqual(list(t.has_multi(
 
233
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
234
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
235
            [True, True, True, False,
 
236
             True, True, True, True])
 
237
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
 
238
            self.assert_(os.path.isdir(d))
 
239
 
 
240
        if not self.readonly:
 
241
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
 
242
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
 
243
 
 
244
        # Make sure the transport recognizes when a
 
245
        # directory is created by other means
 
246
        # Caching Transports will fail, because dir_e was already seen not
 
247
        # to exist. So instead, we will search for a new directory
 
248
        #os.mkdir('dir_e')
 
249
        #if not self.readonly:
 
250
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
 
251
 
 
252
        os.mkdir('dir_g')
 
253
        if not self.readonly:
 
254
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
255
 
 
256
        # Test get/put in sub-directories
 
257
        if self.readonly:
 
258
            open('dir_a/a', 'wb').write('contents of dir_a/a')
 
259
            open('dir_b/b', 'wb').write('contents of dir_b/b')
 
260
        else:
 
261
            self.assertEqual(
 
262
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
 
263
                             ('dir_b/b', 'contents of dir_b/b')])
 
264
                          , 2)
 
265
        for f in ('dir_a/a', 'dir_b/b'):
 
266
            self.assertEqual(t.get(f).read(), open(f).read())
 
267
 
 
268
    def test_copy_to(self):
 
269
        import tempfile
 
270
        from bzrlib.transport.local import LocalTransport
 
271
 
 
272
        t = self.get_transport()
 
273
 
 
274
        files = ['a', 'b', 'c', 'd']
 
275
        self.build_tree(files)
 
276
 
 
277
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
278
        dtmp_base = os.path.basename(dtmp)
 
279
        local_t = LocalTransport(dtmp)
 
280
 
 
281
        t.copy_to(files, local_t)
 
282
        for f in files:
 
283
            self.assertEquals(open(f).read(),
 
284
                    open(os.path.join(dtmp_base, f)).read())
 
285
 
 
286
        del dtmp, dtmp_base, local_t
 
287
 
 
288
        dtmp = tempfile.mkdtemp(dir='.', prefix='test-transport-')
 
289
        dtmp_base = os.path.basename(dtmp)
 
290
        local_t = LocalTransport(dtmp)
 
291
 
 
292
        files = ['a', 'b', 'c', 'd']
 
293
        t.copy_to(iter(files), local_t)
 
294
        for f in files:
 
295
            self.assertEquals(open(f).read(),
 
296
                    open(os.path.join(dtmp_base, f)).read())
 
297
 
 
298
        del dtmp, dtmp_base, local_t
 
299
 
 
300
    def test_append(self):
 
301
        t = self.get_transport()
 
302
 
 
303
        if self.readonly:
 
304
            open('a', 'wb').write('diff\ncontents for\na\n')
 
305
            open('b', 'wb').write('contents\nfor b\n')
 
306
        else:
 
307
            t.put_multi([
 
308
                    ('a', 'diff\ncontents for\na\n'),
 
309
                    ('b', 'contents\nfor b\n')
 
310
                    ])
 
311
 
 
312
        if self.readonly:
 
313
            self.assertRaises(TransportNotPossible,
 
314
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
 
315
            _append('a', 'add\nsome\nmore\ncontents\n')
 
316
        else:
 
317
            t.append('a', 'add\nsome\nmore\ncontents\n')
 
318
 
 
319
        self.check_file_contents('a', 
 
320
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
 
321
 
 
322
        if self.readonly:
 
323
            self.assertRaises(TransportNotPossible,
 
324
                    t.append_multi,
 
325
                        [('a', 'and\nthen\nsome\nmore\n'),
 
326
                         ('b', 'some\nmore\nfor\nb\n')])
 
327
            _append('a', 'and\nthen\nsome\nmore\n')
 
328
            _append('b', 'some\nmore\nfor\nb\n')
 
329
        else:
 
330
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
 
331
                    ('b', 'some\nmore\nfor\nb\n')])
 
332
        self.check_file_contents('a', 
 
333
            'diff\ncontents for\na\n'
 
334
            'add\nsome\nmore\ncontents\n'
 
335
            'and\nthen\nsome\nmore\n')
 
336
        self.check_file_contents('b', 
 
337
                'contents\nfor b\n'
 
338
                'some\nmore\nfor\nb\n')
 
339
 
 
340
        if self.readonly:
 
341
            _append('a', 'a little bit more\n')
 
342
            _append('b', 'from an iterator\n')
 
343
        else:
 
344
            t.append_multi(iter([('a', 'a little bit more\n'),
 
345
                    ('b', 'from an iterator\n')]))
 
346
        self.check_file_contents('a', 
 
347
            'diff\ncontents for\na\n'
 
348
            'add\nsome\nmore\ncontents\n'
 
349
            'and\nthen\nsome\nmore\n'
 
350
            'a little bit more\n')
 
351
        self.check_file_contents('b', 
 
352
                'contents\nfor b\n'
 
353
                'some\nmore\nfor\nb\n'
 
354
                'from an iterator\n')
 
355
 
 
356
    def test_append_file(self):
 
357
        t = self.get_transport()
 
358
 
 
359
        contents = [
 
360
            ('f1', 'this is a string\nand some more stuff\n'),
 
361
            ('f2', 'here is some text\nand a bit more\n'),
 
362
            ('f3', 'some text for the\nthird file created\n'),
 
363
            ('f4', 'this is a string\nand some more stuff\n'),
 
364
            ('f5', 'here is some text\nand a bit more\n'),
 
365
            ('f6', 'some text for the\nthird file created\n')
 
366
        ]
 
367
        
 
368
        if self.readonly:
 
369
            for f, val in contents:
 
370
                open(f, 'wb').write(val)
 
371
        else:
 
372
            t.put_multi(contents)
 
373
 
 
374
        a1 = StringIO('appending to\none\n')
 
375
        if self.readonly:
 
376
            _append('f1', a1.read())
 
377
        else:
 
378
            t.append('f1', a1)
 
379
 
 
380
        del a1
 
381
 
 
382
        self.check_file_contents('f1', 
 
383
                'this is a string\nand some more stuff\n'
 
384
                'appending to\none\n')
 
385
 
 
386
        a2 = StringIO('adding more\ntext to two\n')
 
387
        a3 = StringIO('some garbage\nto put in three\n')
 
388
 
 
389
        if self.readonly:
 
390
            _append('f2', a2.read())
 
391
            _append('f3', a3.read())
 
392
        else:
 
393
            t.append_multi([('f2', a2), ('f3', a3)])
 
394
 
 
395
        del a2, a3
 
396
 
 
397
        self.check_file_contents('f2',
 
398
                'here is some text\nand a bit more\n'
 
399
                'adding more\ntext to two\n')
 
400
        self.check_file_contents('f3', 
 
401
                'some text for the\nthird file created\n'
 
402
                'some garbage\nto put in three\n')
 
403
 
 
404
        # Test that an actual file object can be used with put
 
405
        a4 = open('f1', 'rb')
 
406
        if self.readonly:
 
407
            _append('f4', a4.read())
 
408
        else:
 
409
            t.append('f4', a4)
 
410
 
 
411
        del a4
 
412
 
 
413
        self.check_file_contents('f4', 
 
414
                'this is a string\nand some more stuff\n'
 
415
                'this is a string\nand some more stuff\n'
 
416
                'appending to\none\n')
 
417
 
 
418
        a5 = open('f2', 'rb')
 
419
        a6 = open('f3', 'rb')
 
420
        if self.readonly:
 
421
            _append('f5', a5.read())
 
422
            _append('f6', a6.read())
 
423
        else:
 
424
            t.append_multi([('f5', a5), ('f6', a6)])
 
425
 
 
426
        del a5, a6
 
427
 
 
428
        self.check_file_contents('f5',
 
429
                'here is some text\nand a bit more\n'
 
430
                'here is some text\nand a bit more\n'
 
431
                'adding more\ntext to two\n')
 
432
        self.check_file_contents('f6',
 
433
                'some text for the\nthird file created\n'
 
434
                'some text for the\nthird file created\n'
 
435
                'some garbage\nto put in three\n')
 
436
 
 
437
    def test_delete(self):
 
438
        # TODO: Test Transport.delete
 
439
        pass
 
440
 
 
441
    def test_move(self):
 
442
        # TODO: Test Transport.move
 
443
        pass
 
444
 
 
445
 
 
446
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
 
447
    def get_transport(self):
 
448
        from bzrlib.transport.local import LocalTransport
 
449
        return LocalTransport('.')
 
450
 
 
451
 
 
452
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
 
453
 
 
454
    readonly = True
 
455
 
 
456
    def get_transport(self):
 
457
        from bzrlib.transport.http import HttpTransport
 
458
        url = self.get_remote_url('.')
 
459
        return HttpTransport(url)
177
460
 
178
461
 
179
462
class TestMemoryTransport(TestCase):
180
463
 
181
464
    def test_get_transport(self):
182
 
        MemoryTransport()
 
465
        memory.MemoryTransport()
183
466
 
184
467
    def test_clone(self):
185
 
        transport = MemoryTransport()
186
 
        self.assertTrue(isinstance(transport, MemoryTransport))
187
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
468
        transport = memory.MemoryTransport()
 
469
        self.failUnless(transport.clone() is transport)
188
470
 
189
471
    def test_abspath(self):
190
 
        transport = MemoryTransport()
191
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
192
 
 
193
 
    def test_abspath_of_root(self):
194
 
        transport = MemoryTransport()
195
 
        self.assertEqual("memory:///", transport.base)
196
 
        self.assertEqual("memory:///", transport.abspath('/'))
197
 
 
198
 
    def test_abspath_of_relpath_starting_at_root(self):
199
 
        transport = MemoryTransport()
200
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
472
        transport = memory.MemoryTransport()
 
473
        self.assertEqual("in-memory:relpath", transport.abspath('relpath'))
 
474
 
 
475
    def test_relpath(self):
 
476
        transport = memory.MemoryTransport()
201
477
 
202
478
    def test_append_and_get(self):
203
 
        transport = MemoryTransport()
204
 
        transport.append_bytes('path', 'content')
 
479
        transport = memory.MemoryTransport()
 
480
        transport.append('path', StringIO('content'))
205
481
        self.assertEqual(transport.get('path').read(), 'content')
206
 
        transport.append_file('path', StringIO('content'))
 
482
        transport.append('path', StringIO('content'))
207
483
        self.assertEqual(transport.get('path').read(), 'contentcontent')
208
484
 
209
485
    def test_put_and_get(self):
210
 
        transport = MemoryTransport()
211
 
        transport.put_file('path', StringIO('content'))
 
486
        transport = memory.MemoryTransport()
 
487
        transport.put('path', StringIO('content'))
212
488
        self.assertEqual(transport.get('path').read(), 'content')
213
 
        transport.put_bytes('path', 'content')
 
489
        transport.put('path', StringIO('content'))
214
490
        self.assertEqual(transport.get('path').read(), 'content')
215
491
 
216
492
    def test_append_without_dir_fails(self):
217
 
        transport = MemoryTransport()
 
493
        transport = memory.MemoryTransport()
218
494
        self.assertRaises(NoSuchFile,
219
 
                          transport.append_bytes, 'dir/path', 'content')
 
495
                          transport.append, 'dir/path', StringIO('content'))
220
496
 
221
497
    def test_put_without_dir_fails(self):
222
 
        transport = MemoryTransport()
 
498
        transport = memory.MemoryTransport()
223
499
        self.assertRaises(NoSuchFile,
224
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
500
                          transport.put, 'dir/path', StringIO('content'))
225
501
 
226
502
    def test_get_missing(self):
227
 
        transport = MemoryTransport()
 
503
        transport = memory.MemoryTransport()
228
504
        self.assertRaises(NoSuchFile, transport.get, 'foo')
229
505
 
230
506
    def test_has_missing(self):
231
 
        transport = MemoryTransport()
 
507
        transport = memory.MemoryTransport()
232
508
        self.assertEquals(False, transport.has('foo'))
233
509
 
234
510
    def test_has_present(self):
235
 
        transport = MemoryTransport()
236
 
        transport.append_bytes('foo', 'content')
 
511
        transport = memory.MemoryTransport()
 
512
        transport.append('foo', StringIO('content'))
237
513
        self.assertEquals(True, transport.has('foo'))
238
514
 
239
 
    def test_list_dir(self):
240
 
        transport = MemoryTransport()
241
 
        transport.put_bytes('foo', 'content')
242
 
        transport.mkdir('dir')
243
 
        transport.put_bytes('dir/subfoo', 'content')
244
 
        transport.put_bytes('dirlike', 'content')
245
 
 
246
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
247
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
248
 
 
249
515
    def test_mkdir(self):
250
 
        transport = MemoryTransport()
 
516
        transport = memory.MemoryTransport()
251
517
        transport.mkdir('dir')
252
 
        transport.append_bytes('dir/path', 'content')
 
518
        transport.append('dir/path', StringIO('content'))
253
519
        self.assertEqual(transport.get('dir/path').read(), 'content')
254
520
 
255
521
    def test_mkdir_missing_parent(self):
256
 
        transport = MemoryTransport()
 
522
        transport = memory.MemoryTransport()
257
523
        self.assertRaises(NoSuchFile,
258
524
                          transport.mkdir, 'dir/dir')
259
525
 
260
526
    def test_mkdir_twice(self):
261
 
        transport = MemoryTransport()
 
527
        transport = memory.MemoryTransport()
262
528
        transport.mkdir('dir')
263
529
        self.assertRaises(FileExists, transport.mkdir, 'dir')
264
530
 
265
531
    def test_parameters(self):
266
 
        transport = MemoryTransport()
 
532
        transport = memory.MemoryTransport()
267
533
        self.assertEqual(True, transport.listable())
268
534
        self.assertEqual(False, transport.should_cache())
269
 
        self.assertEqual(False, transport.is_readonly())
270
535
 
271
536
    def test_iter_files_recursive(self):
272
 
        transport = MemoryTransport()
 
537
        transport = memory.MemoryTransport()
273
538
        transport.mkdir('dir')
274
 
        transport.put_bytes('dir/foo', 'content')
275
 
        transport.put_bytes('dir/bar', 'content')
276
 
        transport.put_bytes('bar', 'content')
 
539
        transport.put('dir/foo', StringIO('content'))
 
540
        transport.put('dir/bar', StringIO('content'))
 
541
        transport.put('bar', StringIO('content'))
277
542
        paths = set(transport.iter_files_recursive())
278
543
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
279
544
 
280
545
    def test_stat(self):
281
 
        transport = MemoryTransport()
282
 
        transport.put_bytes('foo', 'content')
283
 
        transport.put_bytes('bar', 'phowar')
 
546
        transport = memory.MemoryTransport()
 
547
        transport.put('foo', StringIO('content'))
 
548
        transport.put('bar', StringIO('phowar'))
284
549
        self.assertEqual(7, transport.stat('foo').st_size)
285
550
        self.assertEqual(6, transport.stat('bar').st_size)
286
 
 
287
 
 
288
 
class ChrootDecoratorTransportTest(TestCase):
289
 
    """Chroot decoration specific tests."""
290
 
 
291
 
    def test_construct(self):
292
 
        from bzrlib.transport import chroot
293
 
        transport = chroot.ChrootTransportDecorator('chroot+memory:///pathA/')
294
 
        self.assertEqual('memory:///pathA/', transport.chroot_url)
295
 
 
296
 
        transport = chroot.ChrootTransportDecorator(
297
 
            'chroot+memory:///path/B', chroot='memory:///path/')
298
 
        self.assertEqual('memory:///path/', transport.chroot_url)
299
 
 
300
 
    def test_append_file(self):
301
 
        transport = get_transport('chroot+memory:///foo/bar')
302
 
        self.assertRaises(PathNotChild, transport.append_file, '/foo', None)
303
 
 
304
 
    def test_append_bytes(self):
305
 
        transport = get_transport('chroot+memory:///foo/bar')
306
 
        self.assertRaises(PathNotChild, transport.append_bytes, '/foo', 'bytes')
307
 
 
308
 
    def test_clone(self):
309
 
        transport = get_transport('chroot+memory:///foo/bar')
310
 
        self.assertRaises(PathNotChild, transport.clone, '/foo')
311
 
 
312
 
    def test_delete(self):
313
 
        transport = get_transport('chroot+memory:///foo/bar')
314
 
        self.assertRaises(PathNotChild, transport.delete, '/foo')
315
 
 
316
 
    def test_delete_tree(self):
317
 
        transport = get_transport('chroot+memory:///foo/bar')
318
 
        self.assertRaises(PathNotChild, transport.delete_tree, '/foo')
319
 
 
320
 
    def test_get(self):
321
 
        transport = get_transport('chroot+memory:///foo/bar')
322
 
        self.assertRaises(PathNotChild, transport.get, '/foo')
323
 
 
324
 
    def test_get_bytes(self):
325
 
        transport = get_transport('chroot+memory:///foo/bar')
326
 
        self.assertRaises(PathNotChild, transport.get_bytes, '/foo')
327
 
 
328
 
    def test_has(self):
329
 
        transport = get_transport('chroot+memory:///foo/bar')
330
 
        self.assertRaises(PathNotChild, transport.has, '/foo')
331
 
 
332
 
    def test_list_dir(self):
333
 
        transport = get_transport('chroot+memory:///foo/bar')
334
 
        self.assertRaises(PathNotChild, transport.list_dir, '/foo')
335
 
 
336
 
    def test_lock_read(self):
337
 
        transport = get_transport('chroot+memory:///foo/bar')
338
 
        self.assertRaises(PathNotChild, transport.lock_read, '/foo')
339
 
 
340
 
    def test_lock_write(self):
341
 
        transport = get_transport('chroot+memory:///foo/bar')
342
 
        self.assertRaises(PathNotChild, transport.lock_write, '/foo')
343
 
 
344
 
    def test_mkdir(self):
345
 
        transport = get_transport('chroot+memory:///foo/bar')
346
 
        self.assertRaises(PathNotChild, transport.mkdir, '/foo')
347
 
 
348
 
    def test_put_bytes(self):
349
 
        transport = get_transport('chroot+memory:///foo/bar')
350
 
        self.assertRaises(PathNotChild, transport.put_bytes, '/foo', 'bytes')
351
 
 
352
 
    def test_put_file(self):
353
 
        transport = get_transport('chroot+memory:///foo/bar')
354
 
        self.assertRaises(PathNotChild, transport.put_file, '/foo', None)
355
 
 
356
 
    def test_rename(self):
357
 
        transport = get_transport('chroot+memory:///foo/bar')
358
 
        self.assertRaises(PathNotChild, transport.rename, '/aaa', 'bbb')
359
 
        self.assertRaises(PathNotChild, transport.rename, 'ccc', '/d')
360
 
 
361
 
    def test_rmdir(self):
362
 
        transport = get_transport('chroot+memory:///foo/bar')
363
 
        self.assertRaises(PathNotChild, transport.rmdir, '/foo')
364
 
 
365
 
    def test_stat(self):
366
 
        transport = get_transport('chroot+memory:///foo/bar')
367
 
        self.assertRaises(PathNotChild, transport.stat, '/foo')
368
 
 
369
 
 
370
 
class ReadonlyDecoratorTransportTest(TestCase):
371
 
    """Readonly decoration specific tests."""
372
 
 
373
 
    def test_local_parameters(self):
374
 
        import bzrlib.transport.readonly as readonly
375
 
        # connect to . in readonly mode
376
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
377
 
        self.assertEqual(True, transport.listable())
378
 
        self.assertEqual(False, transport.should_cache())
379
 
        self.assertEqual(True, transport.is_readonly())
380
 
 
381
 
    def test_http_parameters(self):
382
 
        from bzrlib.tests.HttpServer import HttpServer
383
 
        import bzrlib.transport.readonly as readonly
384
 
        # connect to . via http which is not listable
385
 
        server = HttpServer()
386
 
        server.setUp()
387
 
        try:
388
 
            transport = get_transport('readonly+' + server.get_url())
389
 
            self.failUnless(isinstance(transport,
390
 
                                       readonly.ReadonlyTransportDecorator))
391
 
            self.assertEqual(False, transport.listable())
392
 
            self.assertEqual(True, transport.should_cache())
393
 
            self.assertEqual(True, transport.is_readonly())
394
 
        finally:
395
 
            server.tearDown()
396
 
 
397
 
 
398
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
399
 
    """NFS decorator specific tests."""
400
 
 
401
 
    def get_nfs_transport(self, url):
402
 
        import bzrlib.transport.fakenfs as fakenfs
403
 
        # connect to url with nfs decoration
404
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
405
 
 
406
 
    def test_local_parameters(self):
407
 
        # the listable, should_cache and is_readonly parameters
408
 
        # are not changed by the fakenfs decorator
409
 
        transport = self.get_nfs_transport('.')
410
 
        self.assertEqual(True, transport.listable())
411
 
        self.assertEqual(False, transport.should_cache())
412
 
        self.assertEqual(False, transport.is_readonly())
413
 
 
414
 
    def test_http_parameters(self):
415
 
        # the listable, should_cache and is_readonly parameters
416
 
        # are not changed by the fakenfs decorator
417
 
        from bzrlib.tests.HttpServer import HttpServer
418
 
        # connect to . via http which is not listable
419
 
        server = HttpServer()
420
 
        server.setUp()
421
 
        try:
422
 
            transport = self.get_nfs_transport(server.get_url())
423
 
            self.assertIsInstance(
424
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
425
 
            self.assertEqual(False, transport.listable())
426
 
            self.assertEqual(True, transport.should_cache())
427
 
            self.assertEqual(True, transport.is_readonly())
428
 
        finally:
429
 
            server.tearDown()
430
 
 
431
 
    def test_fakenfs_server_default(self):
432
 
        # a FakeNFSServer() should bring up a local relpath server for itself
433
 
        import bzrlib.transport.fakenfs as fakenfs
434
 
        server = fakenfs.FakeNFSServer()
435
 
        server.setUp()
436
 
        try:
437
 
            # the url should be decorated appropriately
438
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
439
 
            # and we should be able to get a transport for it
440
 
            transport = get_transport(server.get_url())
441
 
            # which must be a FakeNFSTransportDecorator instance.
442
 
            self.assertIsInstance(
443
 
                transport, fakenfs.FakeNFSTransportDecorator)
444
 
        finally:
445
 
            server.tearDown()
446
 
 
447
 
    def test_fakenfs_rename_semantics(self):
448
 
        # a FakeNFS transport must mangle the way rename errors occur to
449
 
        # look like NFS problems.
450
 
        transport = self.get_nfs_transport('.')
451
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
452
 
                        transport=transport)
453
 
        self.assertRaises(bzrlib.errors.ResourceBusy,
454
 
                          transport.rename, 'from', 'to')
455
 
 
456
 
 
457
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
458
 
    """Tests for simulation of VFAT restrictions"""
459
 
 
460
 
    def get_vfat_transport(self, url):
461
 
        """Return vfat-backed transport for test directory"""
462
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
463
 
        return FakeVFATTransportDecorator('vfat+' + url)
464
 
 
465
 
    def test_transport_creation(self):
466
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
467
 
        transport = self.get_vfat_transport('.')
468
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
469
 
 
470
 
    def test_transport_mkdir(self):
471
 
        transport = self.get_vfat_transport('.')
472
 
        transport.mkdir('HELLO')
473
 
        self.assertTrue(transport.has('hello'))
474
 
        self.assertTrue(transport.has('Hello'))
475
 
 
476
 
    def test_forbidden_chars(self):
477
 
        transport = self.get_vfat_transport('.')
478
 
        self.assertRaises(ValueError, transport.has, "<NU>")
479
 
 
480
 
 
481
 
class BadTransportHandler(Transport):
482
 
    def __init__(self, base_url):
483
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
484
 
 
485
 
 
486
 
class BackupTransportHandler(Transport):
487
 
    """Test transport that works as a backup for the BadTransportHandler"""
488
 
    pass
489
 
 
490
 
 
491
 
class TestTransportImplementation(TestCaseInTempDir):
492
 
    """Implementation verification for transports.
493
 
    
494
 
    To verify a transport we need a server factory, which is a callable
495
 
    that accepts no parameters and returns an implementation of
496
 
    bzrlib.transport.Server.
497
 
    
498
 
    That Server is then used to construct transport instances and test
499
 
    the transport via loopback activity.
500
 
 
501
 
    Currently this assumes that the Transport object is connected to the 
502
 
    current working directory.  So that whatever is done 
503
 
    through the transport, should show up in the working 
504
 
    directory, and vice-versa. This is a bug, because its possible to have
505
 
    URL schemes which provide access to something that may not be 
506
 
    result in storage on the local disk, i.e. due to file system limits, or 
507
 
    due to it being a database or some other non-filesystem tool.
508
 
 
509
 
    This also tests to make sure that the functions work with both
510
 
    generators and lists (assuming iter(list) is effectively a generator)
511
 
    """
512
 
    
513
 
    def setUp(self):
514
 
        super(TestTransportImplementation, self).setUp()
515
 
        self._server = self.transport_server()
516
 
        self._server.setUp()
517
 
 
518
 
    def tearDown(self):
519
 
        super(TestTransportImplementation, self).tearDown()
520
 
        self._server.tearDown()
521
551
        
522
 
    def get_transport(self):
523
 
        """Return a connected transport to the local directory."""
524
 
        base_url = self._server.get_url()
525
 
        # try getting the transport via the regular interface:
526
 
        t = get_transport(base_url)
527
 
        if not isinstance(t, self.transport_class):
528
 
            # we did not get the correct transport class type. Override the
529
 
            # regular connection behaviour by direct construction.
530
 
            t = self.transport_class(base_url)
531
 
        return t
532
 
 
533
 
 
534
 
class TestLocalTransports(TestCase):
535
 
 
536
 
    def test_get_transport_from_abspath(self):
537
 
        here = os.path.abspath('.')
538
 
        t = get_transport(here)
539
 
        self.assertIsInstance(t, LocalTransport)
540
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
541
 
 
542
 
    def test_get_transport_from_relpath(self):
543
 
        here = os.path.abspath('.')
544
 
        t = get_transport('.')
545
 
        self.assertIsInstance(t, LocalTransport)
546
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
547
 
 
548
 
    def test_get_transport_from_local_url(self):
549
 
        here = os.path.abspath('.')
550
 
        here_url = urlutils.local_path_to_url(here) + '/'
551
 
        t = get_transport(here_url)
552
 
        self.assertIsInstance(t, LocalTransport)
553
 
        self.assertEquals(t.base, here_url)