~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_transport.py

Don't encode unicode messages to UTF-8 in mutter() (the stream writer does it).

Use a codec wrapped log file in tests to match the real environment.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2004, 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
 
18
18
import os
19
 
import sys
20
 
import stat
21
19
from cStringIO import StringIO
22
20
 
23
 
import bzrlib
24
 
from bzrlib import (
25
 
    errors,
26
 
    urlutils,
27
 
    )
28
 
from bzrlib.errors import (ConnectionError,
29
 
                           DependencyNotPresent,
30
 
                           FileExists,
31
 
                           InvalidURLJoin,
32
 
                           NoSuchFile,
33
 
                           PathNotChild,
34
 
                           TransportNotPossible,
35
 
                           UnsupportedProtocol,
36
 
                           )
 
21
from bzrlib.errors import (NoSuchFile, FileExists, TransportNotPossible,
 
22
                           ConnectionError)
37
23
from bzrlib.tests import TestCase, TestCaseInTempDir
38
 
from bzrlib.transport import (_CoalescedOffset,
39
 
                              _get_protocol_handlers,
40
 
                              _set_protocol_handlers,
41
 
                              _get_transport_modules,
42
 
                              get_transport,
43
 
                              register_lazy_transport,
44
 
                              register_transport_proto,
45
 
                              _clear_protocol_handlers,
46
 
                              Transport,
47
 
                              )
48
 
from bzrlib.transport.chroot import ChrootServer
49
 
from bzrlib.transport.memory import MemoryTransport
50
 
from bzrlib.transport.local import (LocalTransport,
51
 
                                    EmulatedWin32LocalTransport)
52
 
 
53
 
 
54
 
# TODO: Should possibly split transport-specific tests into their own files.
55
 
 
 
24
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
25
from bzrlib.transport import memory, urlescape
 
26
 
 
27
 
 
28
def _append(fn, txt):
 
29
    """Append the given text (file-like object) to the supplied filename."""
 
30
    f = open(fn, 'ab')
 
31
    f.write(txt)
 
32
    f.flush()
 
33
    f.close()
 
34
    del f
56
35
 
57
36
class TestTransport(TestCase):
58
37
    """Test the non transport-concrete class functionality."""
59
38
 
60
 
    def test__get_set_protocol_handlers(self):
61
 
        handlers = _get_protocol_handlers()
62
 
        self.assertNotEqual([], handlers.keys( ))
63
 
        try:
64
 
            _clear_protocol_handlers()
65
 
            self.assertEqual([], _get_protocol_handlers().keys())
66
 
        finally:
67
 
            _set_protocol_handlers(handlers)
68
 
 
69
 
    def test_get_transport_modules(self):
70
 
        handlers = _get_protocol_handlers()
71
 
        class SampleHandler(object):
72
 
            """I exist, isnt that enough?"""
73
 
        try:
74
 
            _clear_protocol_handlers()
75
 
            register_transport_proto('foo')
76
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
77
 
            register_transport_proto('bar')
78
 
            register_lazy_transport('bar', 'bzrlib.tests.test_transport', 'TestTransport.SampleHandler')
79
 
            self.assertEqual([SampleHandler.__module__, 'bzrlib.transport.chroot'],
80
 
                             _get_transport_modules())
81
 
        finally:
82
 
            _set_protocol_handlers(handlers)
83
 
 
84
 
    def test_transport_dependency(self):
85
 
        """Transport with missing dependency causes no error"""
86
 
        saved_handlers = _get_protocol_handlers()
87
 
        try:
88
 
            register_transport_proto('foo')
89
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
90
 
                    'BadTransportHandler')
91
 
            try:
92
 
                get_transport('foo://fooserver/foo')
93
 
            except UnsupportedProtocol, e:
94
 
                e_str = str(e)
95
 
                self.assertEquals('Unsupported protocol'
96
 
                                  ' for url "foo://fooserver/foo":'
97
 
                                  ' Unable to import library "some_lib":'
98
 
                                  ' testing missing dependency', str(e))
99
 
            else:
100
 
                self.fail('Did not raise UnsupportedProtocol')
101
 
        finally:
102
 
            # restore original values
103
 
            _set_protocol_handlers(saved_handlers)
104
 
            
105
 
    def test_transport_fallback(self):
106
 
        """Transport with missing dependency causes no error"""
107
 
        saved_handlers = _get_protocol_handlers()
108
 
        try:
109
 
            _clear_protocol_handlers()
110
 
            register_transport_proto('foo')
111
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
112
 
                    'BackupTransportHandler')
113
 
            register_lazy_transport('foo', 'bzrlib.tests.test_transport',
114
 
                    'BadTransportHandler')
115
 
            t = get_transport('foo://fooserver/foo')
116
 
            self.assertTrue(isinstance(t, BackupTransportHandler))
117
 
        finally:
118
 
            _set_protocol_handlers(saved_handlers)
119
 
 
120
 
    def test__combine_paths(self):
121
 
        t = Transport('/')
122
 
        self.assertEqual('/home/sarah/project/foo',
123
 
                         t._combine_paths('/home/sarah', 'project/foo'))
124
 
        self.assertEqual('/etc',
125
 
                         t._combine_paths('/home/sarah', '../../etc'))
126
 
        self.assertEqual('/etc',
127
 
                         t._combine_paths('/home/sarah', '../../../etc'))
128
 
        self.assertEqual('/etc',
129
 
                         t._combine_paths('/home/sarah', '/etc'))
130
 
 
131
 
    def test_local_abspath_non_local_transport(self):
132
 
        # the base implementation should throw
133
 
        t = MemoryTransport()
134
 
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
135
 
        self.assertEqual('memory:///t is not a local path.', str(e))
136
 
 
137
 
 
138
 
class TestCoalesceOffsets(TestCase):
139
 
    
140
 
    def check(self, expected, offsets, limit=0, fudge=0):
141
 
        coalesce = Transport._coalesce_offsets
142
 
        exp = [_CoalescedOffset(*x) for x in expected]
143
 
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge))
144
 
        self.assertEqual(exp, out)
145
 
 
146
 
    def test_coalesce_empty(self):
147
 
        self.check([], [])
148
 
 
149
 
    def test_coalesce_simple(self):
150
 
        self.check([(0, 10, [(0, 10)])], [(0, 10)])
151
 
 
152
 
    def test_coalesce_unrelated(self):
153
 
        self.check([(0, 10, [(0, 10)]),
154
 
                    (20, 10, [(0, 10)]),
155
 
                   ], [(0, 10), (20, 10)])
156
 
            
157
 
    def test_coalesce_unsorted(self):
158
 
        self.check([(20, 10, [(0, 10)]),
159
 
                    (0, 10, [(0, 10)]),
160
 
                   ], [(20, 10), (0, 10)])
161
 
 
162
 
    def test_coalesce_nearby(self):
163
 
        self.check([(0, 20, [(0, 10), (10, 10)])],
164
 
                   [(0, 10), (10, 10)])
165
 
 
166
 
    def test_coalesce_overlapped(self):
167
 
        self.check([(0, 15, [(0, 10), (5, 10)])],
168
 
                   [(0, 10), (5, 10)])
169
 
 
170
 
    def test_coalesce_limit(self):
171
 
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
172
 
                              (30, 10), (40, 10)]),
173
 
                    (60, 50, [(0, 10), (10, 10), (20, 10),
174
 
                              (30, 10), (40, 10)]),
175
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
176
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
177
 
                       (90, 10), (100, 10)],
178
 
                    limit=5)
179
 
 
180
 
    def test_coalesce_no_limit(self):
181
 
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
182
 
                               (30, 10), (40, 10), (50, 10),
183
 
                               (60, 10), (70, 10), (80, 10),
184
 
                               (90, 10)]),
185
 
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
186
 
                       (50, 10), (60, 10), (70, 10), (80, 10),
187
 
                       (90, 10), (100, 10)])
188
 
 
189
 
    def test_coalesce_fudge(self):
190
 
        self.check([(10, 30, [(0, 10), (20, 10)]),
191
 
                    (100, 10, [(0, 10),]),
192
 
                   ], [(10, 10), (30, 10), (100, 10)],
193
 
                   fudge=10
194
 
                  )
 
39
    def test_urlescape(self):
 
40
        self.assertEqual('%25', urlescape('%'))
 
41
 
 
42
 
 
43
class TestTransportMixIn(object):
 
44
    """Subclass this, and it will provide a series of tests for a Transport.
 
45
    It assumes that the Transport object is connected to the 
 
46
    current working directory.  So that whatever is done 
 
47
    through the transport, should show up in the working 
 
48
    directory, and vice-versa.
 
49
 
 
50
    This also tests to make sure that the functions work with both
 
51
    generators and lists (assuming iter(list) is effectively a generator)
 
52
    """
 
53
    readonly = False
 
54
    def get_transport(self):
 
55
        """Children should override this to return the Transport object.
 
56
        """
 
57
        raise NotImplementedError
 
58
 
 
59
    def test_has(self):
 
60
        t = self.get_transport()
 
61
 
 
62
        files = ['a', 'b', 'e', 'g', '%']
 
63
        self.build_tree(files)
 
64
        self.assertEqual(t.has('a'), True)
 
65
        self.assertEqual(t.has('c'), False)
 
66
        self.assertEqual(t.has(urlescape('%')), True)
 
67
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
 
68
                [True, True, False, False, True, False, True, False])
 
69
        self.assertEqual(t.has_any(['a', 'b', 'c']), True)
 
70
        self.assertEqual(t.has_any(['c', 'd', 'f', urlescape('%%')]), False)
 
71
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
 
72
                [True, True, False, False, True, False, True, False])
 
73
        self.assertEqual(t.has_any(['c', 'c', 'c']), False)
 
74
        self.assertEqual(t.has_any(['b', 'b', 'b']), True)
 
75
 
 
76
    def test_get(self):
 
77
        t = self.get_transport()
 
78
 
 
79
        files = ['a', 'b', 'e', 'g']
 
80
        self.build_tree(files)
 
81
        self.assertEqual(t.get('a').read(), open('a').read())
 
82
        content_f = t.get_multi(files)
 
83
        for path,f in zip(files, content_f):
 
84
            self.assertEqual(open(path).read(), f.read())
 
85
 
 
86
        content_f = t.get_multi(iter(files))
 
87
        for path,f in zip(files, content_f):
 
88
            self.assertEqual(open(path).read(), f.read())
 
89
 
 
90
        self.assertRaises(NoSuchFile, t.get, 'c')
 
91
        try:
 
92
            files = list(t.get_multi(['a', 'b', 'c']))
 
93
        except NoSuchFile:
 
94
            pass
 
95
        else:
 
96
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
97
        try:
 
98
            files = list(t.get_multi(iter(['a', 'b', 'c', 'e'])))
 
99
        except NoSuchFile:
 
100
            pass
 
101
        else:
 
102
            self.fail('Failed to raise NoSuchFile for missing file in get_multi')
 
103
 
 
104
    def test_put(self):
 
105
        t = self.get_transport()
 
106
 
 
107
        if self.readonly:
 
108
            self.assertRaises(TransportNotPossible,
 
109
                    t.put, 'a', 'some text for a\n')
 
110
            open('a', 'wb').write('some text for a\n')
 
111
        else:
 
112
            t.put('a', 'some text for a\n')
 
113
        self.assert_(os.path.exists('a'))
 
114
        self.check_file_contents('a', 'some text for a\n')
 
115
        self.assertEqual(t.get('a').read(), 'some text for a\n')
 
116
        # Make sure 'has' is updated
 
117
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
118
                [True, False, False, False, False])
 
119
        if self.readonly:
 
120
            self.assertRaises(TransportNotPossible,
 
121
                    t.put_multi,
 
122
                    [('a', 'new\ncontents for\na\n'),
 
123
                        ('d', 'contents\nfor d\n')])
 
124
            open('a', 'wb').write('new\ncontents for\na\n')
 
125
            open('d', 'wb').write('contents\nfor d\n')
 
126
        else:
 
127
            # Put also replaces contents
 
128
            self.assertEqual(t.put_multi([('a', 'new\ncontents for\na\n'),
 
129
                                          ('d', 'contents\nfor d\n')]),
 
130
                             2)
 
131
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e'])),
 
132
                [True, False, False, True, False])
 
133
        self.check_file_contents('a', 'new\ncontents for\na\n')
 
134
        self.check_file_contents('d', 'contents\nfor d\n')
 
135
 
 
136
        if self.readonly:
 
137
            self.assertRaises(TransportNotPossible,
 
138
                t.put_multi, iter([('a', 'diff\ncontents for\na\n'),
 
139
                                  ('d', 'another contents\nfor d\n')]))
 
140
            open('a', 'wb').write('diff\ncontents for\na\n')
 
141
            open('d', 'wb').write('another contents\nfor d\n')
 
142
        else:
 
143
            self.assertEqual(
 
144
                t.put_multi(iter([('a', 'diff\ncontents for\na\n'),
 
145
                                  ('d', 'another contents\nfor d\n')]))
 
146
                             , 2)
 
147
        self.check_file_contents('a', 'diff\ncontents for\na\n')
 
148
        self.check_file_contents('d', 'another contents\nfor d\n')
 
149
 
 
150
        if self.readonly:
 
151
            self.assertRaises(TransportNotPossible,
 
152
                    t.put, 'path/doesnt/exist/c', 'contents')
 
153
        else:
 
154
            self.assertRaises(NoSuchFile,
 
155
                    t.put, 'path/doesnt/exist/c', 'contents')
 
156
 
 
157
    def test_put_file(self):
 
158
        t = self.get_transport()
 
159
 
 
160
        # Test that StringIO can be used as a file-like object with put
 
161
        f1 = StringIO('this is a string\nand some more stuff\n')
 
162
        if self.readonly:
 
163
            open('f1', 'wb').write(f1.read())
 
164
        else:
 
165
            t.put('f1', f1)
 
166
 
 
167
        del f1
 
168
 
 
169
        self.check_file_contents('f1', 
 
170
                'this is a string\nand some more stuff\n')
 
171
 
 
172
        f2 = StringIO('here is some text\nand a bit more\n')
 
173
        f3 = StringIO('some text for the\nthird file created\n')
 
174
 
 
175
        if self.readonly:
 
176
            open('f2', 'wb').write(f2.read())
 
177
            open('f3', 'wb').write(f3.read())
 
178
        else:
 
179
            t.put_multi([('f2', f2), ('f3', f3)])
 
180
 
 
181
        del f2, f3
 
182
 
 
183
        self.check_file_contents('f2', 'here is some text\nand a bit more\n')
 
184
        self.check_file_contents('f3', 'some text for the\nthird file created\n')
 
185
 
 
186
        # Test that an actual file object can be used with put
 
187
        f4 = open('f1', 'rb')
 
188
        if self.readonly:
 
189
            open('f4', 'wb').write(f4.read())
 
190
        else:
 
191
            t.put('f4', f4)
 
192
 
 
193
        del f4
 
194
 
 
195
        self.check_file_contents('f4', 
 
196
                'this is a string\nand some more stuff\n')
 
197
 
 
198
        f5 = open('f2', 'rb')
 
199
        f6 = open('f3', 'rb')
 
200
        if self.readonly:
 
201
            open('f5', 'wb').write(f5.read())
 
202
            open('f6', 'wb').write(f6.read())
 
203
        else:
 
204
            t.put_multi([('f5', f5), ('f6', f6)])
 
205
 
 
206
        del f5, f6
 
207
 
 
208
        self.check_file_contents('f5', 'here is some text\nand a bit more\n')
 
209
        self.check_file_contents('f6', 'some text for the\nthird file created\n')
 
210
 
 
211
 
 
212
 
 
213
    def test_mkdir(self):
 
214
        t = self.get_transport()
 
215
 
 
216
        # Test mkdir
 
217
        os.mkdir('dir_a')
 
218
        self.assertEqual(t.has('dir_a'), True)
 
219
        self.assertEqual(t.has('dir_b'), False)
 
220
 
 
221
        if self.readonly:
 
222
            self.assertRaises(TransportNotPossible,
 
223
                    t.mkdir, 'dir_b')
 
224
            os.mkdir('dir_b')
 
225
        else:
 
226
            t.mkdir('dir_b')
 
227
        self.assertEqual(t.has('dir_b'), True)
 
228
        self.assert_(os.path.isdir('dir_b'))
 
229
 
 
230
        if self.readonly:
 
231
            self.assertRaises(TransportNotPossible,
 
232
                    t.mkdir_multi, ['dir_c', 'dir_d'])
 
233
            os.mkdir('dir_c')
 
234
            os.mkdir('dir_d')
 
235
        else:
 
236
            t.mkdir_multi(['dir_c', 'dir_d'])
 
237
 
 
238
        if self.readonly:
 
239
            self.assertRaises(TransportNotPossible,
 
240
                    t.mkdir_multi, iter(['dir_e', 'dir_f']))
 
241
            os.mkdir('dir_e')
 
242
            os.mkdir('dir_f')
 
243
        else:
 
244
            t.mkdir_multi(iter(['dir_e', 'dir_f']))
 
245
        self.assertEqual(list(t.has_multi(
 
246
            ['dir_a', 'dir_b', 'dir_c', 'dir_q',
 
247
             'dir_d', 'dir_e', 'dir_f', 'dir_b'])),
 
248
            [True, True, True, False,
 
249
             True, True, True, True])
 
250
        for d in ['dir_a', 'dir_b', 'dir_c', 'dir_d', 'dir_e', 'dir_f']:
 
251
            self.assert_(os.path.isdir(d))
 
252
 
 
253
        if not self.readonly:
 
254
            self.assertRaises(NoSuchFile, t.mkdir, 'path/doesnt/exist')
 
255
            self.assertRaises(FileExists, t.mkdir, 'dir_a') # Creating a directory again should fail
 
256
 
 
257
        # Make sure the transport recognizes when a
 
258
        # directory is created by other means
 
259
        # Caching Transports will fail, because dir_e was already seen not
 
260
        # to exist. So instead, we will search for a new directory
 
261
        #os.mkdir('dir_e')
 
262
        #if not self.readonly:
 
263
        #    self.assertRaises(FileExists, t.mkdir, 'dir_e')
 
264
 
 
265
        os.mkdir('dir_g')
 
266
        if not self.readonly:
 
267
            self.assertRaises(FileExists, t.mkdir, 'dir_g')
 
268
 
 
269
        # Test get/put in sub-directories
 
270
        if self.readonly:
 
271
            open('dir_a/a', 'wb').write('contents of dir_a/a')
 
272
            open('dir_b/b', 'wb').write('contents of dir_b/b')
 
273
        else:
 
274
            self.assertEqual(
 
275
                t.put_multi([('dir_a/a', 'contents of dir_a/a'),
 
276
                             ('dir_b/b', 'contents of dir_b/b')])
 
277
                          , 2)
 
278
        for f in ('dir_a/a', 'dir_b/b'):
 
279
            self.assertEqual(t.get(f).read(), open(f).read())
 
280
 
 
281
    def test_copy_to(self):
 
282
        import tempfile
 
283
        from bzrlib.transport.local import LocalTransport
 
284
 
 
285
        t = self.get_transport()
 
286
 
 
287
        files = ['a', 'b', 'c', 'd']
 
288
        self.build_tree(files)
 
289
 
 
290
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
 
291
        dtmp_base = os.path.basename(dtmp)
 
292
        local_t = LocalTransport(dtmp)
 
293
 
 
294
        t.copy_to(files, local_t)
 
295
        for f in files:
 
296
            self.assertEquals(open(f).read(),
 
297
                    open(os.path.join(dtmp_base, f)).read())
 
298
 
 
299
        # Test that copying into a missing directory raises
 
300
        # NoSuchFile
 
301
        os.mkdir('e')
 
302
        open('e/f', 'wb').write('contents of e')
 
303
        self.assertRaises(NoSuchFile, t.copy_to, ['e/f'], local_t)
 
304
 
 
305
        os.mkdir(os.path.join(dtmp_base, 'e'))
 
306
        t.copy_to(['e/f'], local_t)
 
307
 
 
308
        del dtmp, dtmp_base, local_t
 
309
 
 
310
        dtmp = tempfile.mkdtemp(dir=u'.', prefix='test-transport-')
 
311
        dtmp_base = os.path.basename(dtmp)
 
312
        local_t = LocalTransport(dtmp)
 
313
 
 
314
        files = ['a', 'b', 'c', 'd']
 
315
        t.copy_to(iter(files), local_t)
 
316
        for f in files:
 
317
            self.assertEquals(open(f).read(),
 
318
                    open(os.path.join(dtmp_base, f)).read())
 
319
 
 
320
        del dtmp, dtmp_base, local_t
 
321
 
 
322
    def test_append(self):
 
323
        t = self.get_transport()
 
324
 
 
325
        if self.readonly:
 
326
            open('a', 'wb').write('diff\ncontents for\na\n')
 
327
            open('b', 'wb').write('contents\nfor b\n')
 
328
        else:
 
329
            t.put_multi([
 
330
                    ('a', 'diff\ncontents for\na\n'),
 
331
                    ('b', 'contents\nfor b\n')
 
332
                    ])
 
333
 
 
334
        if self.readonly:
 
335
            self.assertRaises(TransportNotPossible,
 
336
                    t.append, 'a', 'add\nsome\nmore\ncontents\n')
 
337
            _append('a', 'add\nsome\nmore\ncontents\n')
 
338
        else:
 
339
            t.append('a', 'add\nsome\nmore\ncontents\n')
 
340
 
 
341
        self.check_file_contents('a', 
 
342
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n')
 
343
 
 
344
        if self.readonly:
 
345
            self.assertRaises(TransportNotPossible,
 
346
                    t.append_multi,
 
347
                        [('a', 'and\nthen\nsome\nmore\n'),
 
348
                         ('b', 'some\nmore\nfor\nb\n')])
 
349
            _append('a', 'and\nthen\nsome\nmore\n')
 
350
            _append('b', 'some\nmore\nfor\nb\n')
 
351
        else:
 
352
            t.append_multi([('a', 'and\nthen\nsome\nmore\n'),
 
353
                    ('b', 'some\nmore\nfor\nb\n')])
 
354
        self.check_file_contents('a', 
 
355
            'diff\ncontents for\na\n'
 
356
            'add\nsome\nmore\ncontents\n'
 
357
            'and\nthen\nsome\nmore\n')
 
358
        self.check_file_contents('b', 
 
359
                'contents\nfor b\n'
 
360
                'some\nmore\nfor\nb\n')
 
361
 
 
362
        if self.readonly:
 
363
            _append('a', 'a little bit more\n')
 
364
            _append('b', 'from an iterator\n')
 
365
        else:
 
366
            t.append_multi(iter([('a', 'a little bit more\n'),
 
367
                    ('b', 'from an iterator\n')]))
 
368
        self.check_file_contents('a', 
 
369
            'diff\ncontents for\na\n'
 
370
            'add\nsome\nmore\ncontents\n'
 
371
            'and\nthen\nsome\nmore\n'
 
372
            'a little bit more\n')
 
373
        self.check_file_contents('b', 
 
374
                'contents\nfor b\n'
 
375
                'some\nmore\nfor\nb\n'
 
376
                'from an iterator\n')
 
377
 
 
378
    def test_append_file(self):
 
379
        t = self.get_transport()
 
380
 
 
381
        contents = [
 
382
            ('f1', 'this is a string\nand some more stuff\n'),
 
383
            ('f2', 'here is some text\nand a bit more\n'),
 
384
            ('f3', 'some text for the\nthird file created\n'),
 
385
            ('f4', 'this is a string\nand some more stuff\n'),
 
386
            ('f5', 'here is some text\nand a bit more\n'),
 
387
            ('f6', 'some text for the\nthird file created\n')
 
388
        ]
 
389
        
 
390
        if self.readonly:
 
391
            for f, val in contents:
 
392
                open(f, 'wb').write(val)
 
393
        else:
 
394
            t.put_multi(contents)
 
395
 
 
396
        a1 = StringIO('appending to\none\n')
 
397
        if self.readonly:
 
398
            _append('f1', a1.read())
 
399
        else:
 
400
            t.append('f1', a1)
 
401
 
 
402
        del a1
 
403
 
 
404
        self.check_file_contents('f1', 
 
405
                'this is a string\nand some more stuff\n'
 
406
                'appending to\none\n')
 
407
 
 
408
        a2 = StringIO('adding more\ntext to two\n')
 
409
        a3 = StringIO('some garbage\nto put in three\n')
 
410
 
 
411
        if self.readonly:
 
412
            _append('f2', a2.read())
 
413
            _append('f3', a3.read())
 
414
        else:
 
415
            t.append_multi([('f2', a2), ('f3', a3)])
 
416
 
 
417
        del a2, a3
 
418
 
 
419
        self.check_file_contents('f2',
 
420
                'here is some text\nand a bit more\n'
 
421
                'adding more\ntext to two\n')
 
422
        self.check_file_contents('f3', 
 
423
                'some text for the\nthird file created\n'
 
424
                'some garbage\nto put in three\n')
 
425
 
 
426
        # Test that an actual file object can be used with put
 
427
        a4 = open('f1', 'rb')
 
428
        if self.readonly:
 
429
            _append('f4', a4.read())
 
430
        else:
 
431
            t.append('f4', a4)
 
432
 
 
433
        del a4
 
434
 
 
435
        self.check_file_contents('f4', 
 
436
                'this is a string\nand some more stuff\n'
 
437
                'this is a string\nand some more stuff\n'
 
438
                'appending to\none\n')
 
439
 
 
440
        a5 = open('f2', 'rb')
 
441
        a6 = open('f3', 'rb')
 
442
        if self.readonly:
 
443
            _append('f5', a5.read())
 
444
            _append('f6', a6.read())
 
445
        else:
 
446
            t.append_multi([('f5', a5), ('f6', a6)])
 
447
 
 
448
        del a5, a6
 
449
 
 
450
        self.check_file_contents('f5',
 
451
                'here is some text\nand a bit more\n'
 
452
                'here is some text\nand a bit more\n'
 
453
                'adding more\ntext to two\n')
 
454
        self.check_file_contents('f6',
 
455
                'some text for the\nthird file created\n'
 
456
                'some text for the\nthird file created\n'
 
457
                'some garbage\nto put in three\n')
 
458
 
 
459
    def test_delete(self):
 
460
        # TODO: Test Transport.delete
 
461
        pass
 
462
 
 
463
    def test_move(self):
 
464
        # TODO: Test Transport.move
 
465
        pass
 
466
 
 
467
    def test_connection_error(self):
 
468
        """ConnectionError is raised when connection is impossible"""
 
469
        if not hasattr(self, "get_bogus_transport"):
 
470
            return
 
471
        t = self.get_bogus_transport()
 
472
        try:
 
473
            t.get('.bzr/branch')
 
474
        except (ConnectionError, NoSuchFile), e:
 
475
            pass
 
476
        except (Exception), e:
 
477
            self.failIf(True, 'Wrong exception thrown: %s' % e)
 
478
        else:
 
479
            self.failIf(True, 'Did not get the expected exception.')
 
480
 
 
481
        
 
482
class LocalTransportTest(TestCaseInTempDir, TestTransportMixIn):
 
483
    def get_transport(self):
 
484
        from bzrlib.transport.local import LocalTransport
 
485
        return LocalTransport(u'.')
 
486
 
 
487
 
 
488
class HttpTransportTest(TestCaseWithWebserver, TestTransportMixIn):
 
489
 
 
490
    readonly = True
 
491
 
 
492
    def get_transport(self):
 
493
        from bzrlib.transport.http import HttpTransport
 
494
        url = self.get_remote_url(u'.')
 
495
        return HttpTransport(url)
 
496
 
 
497
    def get_bogus_transport(self):
 
498
        from bzrlib.transport.http import HttpTransport
 
499
        return HttpTransport('http://jasldkjsalkdjalksjdkljasd')
195
500
 
196
501
 
197
502
class TestMemoryTransport(TestCase):
198
503
 
199
504
    def test_get_transport(self):
200
 
        MemoryTransport()
 
505
        memory.MemoryTransport()
201
506
 
202
507
    def test_clone(self):
203
 
        transport = MemoryTransport()
204
 
        self.assertTrue(isinstance(transport, MemoryTransport))
205
 
        self.assertEqual("memory:///", transport.clone("/").base)
 
508
        transport = memory.MemoryTransport()
 
509
        self.failUnless(transport.clone() is transport)
206
510
 
207
511
    def test_abspath(self):
208
 
        transport = MemoryTransport()
209
 
        self.assertEqual("memory:///relpath", transport.abspath('relpath'))
210
 
 
211
 
    def test_abspath_of_root(self):
212
 
        transport = MemoryTransport()
213
 
        self.assertEqual("memory:///", transport.base)
214
 
        self.assertEqual("memory:///", transport.abspath('/'))
215
 
 
216
 
    def test_abspath_of_relpath_starting_at_root(self):
217
 
        transport = MemoryTransport()
218
 
        self.assertEqual("memory:///foo", transport.abspath('/foo'))
 
512
        transport = memory.MemoryTransport()
 
513
        self.assertEqual("in-memory:relpath", transport.abspath('relpath'))
 
514
 
 
515
    def test_relpath(self):
 
516
        transport = memory.MemoryTransport()
219
517
 
220
518
    def test_append_and_get(self):
221
 
        transport = MemoryTransport()
222
 
        transport.append_bytes('path', 'content')
 
519
        transport = memory.MemoryTransport()
 
520
        transport.append('path', StringIO('content'))
223
521
        self.assertEqual(transport.get('path').read(), 'content')
224
 
        transport.append_file('path', StringIO('content'))
 
522
        transport.append('path', StringIO('content'))
225
523
        self.assertEqual(transport.get('path').read(), 'contentcontent')
226
524
 
227
525
    def test_put_and_get(self):
228
 
        transport = MemoryTransport()
229
 
        transport.put_file('path', StringIO('content'))
 
526
        transport = memory.MemoryTransport()
 
527
        transport.put('path', StringIO('content'))
230
528
        self.assertEqual(transport.get('path').read(), 'content')
231
 
        transport.put_bytes('path', 'content')
 
529
        transport.put('path', StringIO('content'))
232
530
        self.assertEqual(transport.get('path').read(), 'content')
233
531
 
234
532
    def test_append_without_dir_fails(self):
235
 
        transport = MemoryTransport()
 
533
        transport = memory.MemoryTransport()
236
534
        self.assertRaises(NoSuchFile,
237
 
                          transport.append_bytes, 'dir/path', 'content')
 
535
                          transport.append, 'dir/path', StringIO('content'))
238
536
 
239
537
    def test_put_without_dir_fails(self):
240
 
        transport = MemoryTransport()
 
538
        transport = memory.MemoryTransport()
241
539
        self.assertRaises(NoSuchFile,
242
 
                          transport.put_file, 'dir/path', StringIO('content'))
 
540
                          transport.put, 'dir/path', StringIO('content'))
243
541
 
244
542
    def test_get_missing(self):
245
 
        transport = MemoryTransport()
 
543
        transport = memory.MemoryTransport()
246
544
        self.assertRaises(NoSuchFile, transport.get, 'foo')
247
545
 
248
546
    def test_has_missing(self):
249
 
        transport = MemoryTransport()
 
547
        transport = memory.MemoryTransport()
250
548
        self.assertEquals(False, transport.has('foo'))
251
549
 
252
550
    def test_has_present(self):
253
 
        transport = MemoryTransport()
254
 
        transport.append_bytes('foo', 'content')
 
551
        transport = memory.MemoryTransport()
 
552
        transport.append('foo', StringIO('content'))
255
553
        self.assertEquals(True, transport.has('foo'))
256
554
 
257
 
    def test_list_dir(self):
258
 
        transport = MemoryTransport()
259
 
        transport.put_bytes('foo', 'content')
260
 
        transport.mkdir('dir')
261
 
        transport.put_bytes('dir/subfoo', 'content')
262
 
        transport.put_bytes('dirlike', 'content')
263
 
 
264
 
        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(transport.list_dir('.')))
265
 
        self.assertEquals(['subfoo'], sorted(transport.list_dir('dir')))
266
 
 
267
555
    def test_mkdir(self):
268
 
        transport = MemoryTransport()
 
556
        transport = memory.MemoryTransport()
269
557
        transport.mkdir('dir')
270
 
        transport.append_bytes('dir/path', 'content')
 
558
        transport.append('dir/path', StringIO('content'))
271
559
        self.assertEqual(transport.get('dir/path').read(), 'content')
272
560
 
273
561
    def test_mkdir_missing_parent(self):
274
 
        transport = MemoryTransport()
 
562
        transport = memory.MemoryTransport()
275
563
        self.assertRaises(NoSuchFile,
276
564
                          transport.mkdir, 'dir/dir')
277
565
 
278
566
    def test_mkdir_twice(self):
279
 
        transport = MemoryTransport()
 
567
        transport = memory.MemoryTransport()
280
568
        transport.mkdir('dir')
281
569
        self.assertRaises(FileExists, transport.mkdir, 'dir')
282
570
 
283
571
    def test_parameters(self):
284
 
        transport = MemoryTransport()
 
572
        transport = memory.MemoryTransport()
285
573
        self.assertEqual(True, transport.listable())
286
574
        self.assertEqual(False, transport.should_cache())
287
 
        self.assertEqual(False, transport.is_readonly())
288
575
 
289
576
    def test_iter_files_recursive(self):
290
 
        transport = MemoryTransport()
 
577
        transport = memory.MemoryTransport()
291
578
        transport.mkdir('dir')
292
 
        transport.put_bytes('dir/foo', 'content')
293
 
        transport.put_bytes('dir/bar', 'content')
294
 
        transport.put_bytes('bar', 'content')
 
579
        transport.put('dir/foo', StringIO('content'))
 
580
        transport.put('dir/bar', StringIO('content'))
 
581
        transport.put('bar', StringIO('content'))
295
582
        paths = set(transport.iter_files_recursive())
296
583
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)
297
584
 
298
585
    def test_stat(self):
299
 
        transport = MemoryTransport()
300
 
        transport.put_bytes('foo', 'content')
301
 
        transport.put_bytes('bar', 'phowar')
 
586
        transport = memory.MemoryTransport()
 
587
        transport.put('foo', StringIO('content'))
 
588
        transport.put('bar', StringIO('phowar'))
302
589
        self.assertEqual(7, transport.stat('foo').st_size)
303
590
        self.assertEqual(6, transport.stat('bar').st_size)
304
591
 
305
 
 
306
 
class ChrootDecoratorTransportTest(TestCase):
307
 
    """Chroot decoration specific tests."""
308
 
 
309
 
    def test_abspath(self):
310
 
        # The abspath is always relative to the chroot_url.
311
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
312
 
        server.setUp()
313
 
        transport = get_transport(server.get_url())
314
 
        self.assertEqual(server.get_url(), transport.abspath('/'))
315
 
 
316
 
        subdir_transport = transport.clone('subdir')
317
 
        self.assertEqual(server.get_url(), subdir_transport.abspath('/'))
318
 
        server.tearDown()
319
 
 
320
 
    def test_clone(self):
321
 
        server = ChrootServer(get_transport('memory:///foo/bar/'))
322
 
        server.setUp()
323
 
        transport = get_transport(server.get_url())
324
 
        # relpath from root and root path are the same
325
 
        relpath_cloned = transport.clone('foo')
326
 
        abspath_cloned = transport.clone('/foo')
327
 
        self.assertEqual(server, relpath_cloned.server)
328
 
        self.assertEqual(server, abspath_cloned.server)
329
 
        server.tearDown()
330
 
    
331
 
    def test_chroot_url_preserves_chroot(self):
332
 
        """Calling get_transport on a chroot transport's base should produce a
333
 
        transport with exactly the same behaviour as the original chroot
334
 
        transport.
335
 
 
336
 
        This is so that it is not possible to escape a chroot by doing::
337
 
            url = chroot_transport.base
338
 
            parent_url = urlutils.join(url, '..')
339
 
            new_transport = get_transport(parent_url)
340
 
        """
341
 
        server = ChrootServer(get_transport('memory:///path/subpath'))
342
 
        server.setUp()
343
 
        transport = get_transport(server.get_url())
344
 
        new_transport = get_transport(transport.base)
345
 
        self.assertEqual(transport.server, new_transport.server)
346
 
        self.assertEqual(transport.base, new_transport.base)
347
 
        server.tearDown()
348
 
        
349
 
    def test_urljoin_preserves_chroot(self):
350
 
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
351
 
        URL that escapes the intended chroot.
352
 
 
353
 
        This is so that it is not possible to escape a chroot by doing::
354
 
            url = chroot_transport.base
355
 
            parent_url = urlutils.join(url, '..')
356
 
            new_transport = get_transport(parent_url)
357
 
        """
358
 
        server = ChrootServer(get_transport('memory:///path/'))
359
 
        server.setUp()
360
 
        transport = get_transport(server.get_url())
361
 
        self.assertRaises(
362
 
            InvalidURLJoin, urlutils.join, transport.base, '..')
363
 
        server.tearDown()
364
 
 
365
 
 
366
 
class ChrootServerTest(TestCase):
367
 
 
368
 
    def test_construct(self):
369
 
        backing_transport = MemoryTransport()
370
 
        server = ChrootServer(backing_transport)
371
 
        self.assertEqual(backing_transport, server.backing_transport)
372
 
 
373
 
    def test_setUp(self):
374
 
        backing_transport = MemoryTransport()
375
 
        server = ChrootServer(backing_transport)
376
 
        server.setUp()
377
 
        self.assertTrue(server.scheme in _get_protocol_handlers().keys())
378
 
 
379
 
    def test_tearDown(self):
380
 
        backing_transport = MemoryTransport()
381
 
        server = ChrootServer(backing_transport)
382
 
        server.setUp()
383
 
        server.tearDown()
384
 
        self.assertFalse(server.scheme in _get_protocol_handlers().keys())
385
 
 
386
 
    def test_get_url(self):
387
 
        backing_transport = MemoryTransport()
388
 
        server = ChrootServer(backing_transport)
389
 
        server.setUp()
390
 
        self.assertEqual('chroot-%d:///' % id(server), server.get_url())
391
 
        server.tearDown()
392
 
 
393
 
 
394
 
class ReadonlyDecoratorTransportTest(TestCase):
395
 
    """Readonly decoration specific tests."""
396
 
 
397
 
    def test_local_parameters(self):
398
 
        import bzrlib.transport.readonly as readonly
399
 
        # connect to . in readonly mode
400
 
        transport = readonly.ReadonlyTransportDecorator('readonly+.')
401
 
        self.assertEqual(True, transport.listable())
402
 
        self.assertEqual(False, transport.should_cache())
403
 
        self.assertEqual(True, transport.is_readonly())
404
 
 
405
 
    def test_http_parameters(self):
406
 
        from bzrlib.tests.HttpServer import HttpServer
407
 
        import bzrlib.transport.readonly as readonly
408
 
        # connect to . via http which is not listable
409
 
        server = HttpServer()
410
 
        server.setUp()
411
 
        try:
412
 
            transport = get_transport('readonly+' + server.get_url())
413
 
            self.failUnless(isinstance(transport,
414
 
                                       readonly.ReadonlyTransportDecorator))
415
 
            self.assertEqual(False, transport.listable())
416
 
            self.assertEqual(True, transport.should_cache())
417
 
            self.assertEqual(True, transport.is_readonly())
418
 
        finally:
419
 
            server.tearDown()
420
 
 
421
 
 
422
 
class FakeNFSDecoratorTests(TestCaseInTempDir):
423
 
    """NFS decorator specific tests."""
424
 
 
425
 
    def get_nfs_transport(self, url):
426
 
        import bzrlib.transport.fakenfs as fakenfs
427
 
        # connect to url with nfs decoration
428
 
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)
429
 
 
430
 
    def test_local_parameters(self):
431
 
        # the listable, should_cache and is_readonly parameters
432
 
        # are not changed by the fakenfs decorator
433
 
        transport = self.get_nfs_transport('.')
434
 
        self.assertEqual(True, transport.listable())
435
 
        self.assertEqual(False, transport.should_cache())
436
 
        self.assertEqual(False, transport.is_readonly())
437
 
 
438
 
    def test_http_parameters(self):
439
 
        # the listable, should_cache and is_readonly parameters
440
 
        # are not changed by the fakenfs decorator
441
 
        from bzrlib.tests.HttpServer import HttpServer
442
 
        # connect to . via http which is not listable
443
 
        server = HttpServer()
444
 
        server.setUp()
445
 
        try:
446
 
            transport = self.get_nfs_transport(server.get_url())
447
 
            self.assertIsInstance(
448
 
                transport, bzrlib.transport.fakenfs.FakeNFSTransportDecorator)
449
 
            self.assertEqual(False, transport.listable())
450
 
            self.assertEqual(True, transport.should_cache())
451
 
            self.assertEqual(True, transport.is_readonly())
452
 
        finally:
453
 
            server.tearDown()
454
 
 
455
 
    def test_fakenfs_server_default(self):
456
 
        # a FakeNFSServer() should bring up a local relpath server for itself
457
 
        import bzrlib.transport.fakenfs as fakenfs
458
 
        server = fakenfs.FakeNFSServer()
459
 
        server.setUp()
460
 
        try:
461
 
            # the url should be decorated appropriately
462
 
            self.assertStartsWith(server.get_url(), 'fakenfs+')
463
 
            # and we should be able to get a transport for it
464
 
            transport = get_transport(server.get_url())
465
 
            # which must be a FakeNFSTransportDecorator instance.
466
 
            self.assertIsInstance(
467
 
                transport, fakenfs.FakeNFSTransportDecorator)
468
 
        finally:
469
 
            server.tearDown()
470
 
 
471
 
    def test_fakenfs_rename_semantics(self):
472
 
        # a FakeNFS transport must mangle the way rename errors occur to
473
 
        # look like NFS problems.
474
 
        transport = self.get_nfs_transport('.')
475
 
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
476
 
                        transport=transport)
477
 
        self.assertRaises(errors.ResourceBusy,
478
 
                          transport.rename, 'from', 'to')
479
 
 
480
 
 
481
 
class FakeVFATDecoratorTests(TestCaseInTempDir):
482
 
    """Tests for simulation of VFAT restrictions"""
483
 
 
484
 
    def get_vfat_transport(self, url):
485
 
        """Return vfat-backed transport for test directory"""
486
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
487
 
        return FakeVFATTransportDecorator('vfat+' + url)
488
 
 
489
 
    def test_transport_creation(self):
490
 
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
491
 
        transport = self.get_vfat_transport('.')
492
 
        self.assertIsInstance(transport, FakeVFATTransportDecorator)
493
 
 
494
 
    def test_transport_mkdir(self):
495
 
        transport = self.get_vfat_transport('.')
496
 
        transport.mkdir('HELLO')
497
 
        self.assertTrue(transport.has('hello'))
498
 
        self.assertTrue(transport.has('Hello'))
499
 
 
500
 
    def test_forbidden_chars(self):
501
 
        transport = self.get_vfat_transport('.')
502
 
        self.assertRaises(ValueError, transport.has, "<NU>")
503
 
 
504
 
 
505
 
class BadTransportHandler(Transport):
506
 
    def __init__(self, base_url):
507
 
        raise DependencyNotPresent('some_lib', 'testing missing dependency')
508
 
 
509
 
 
510
 
class BackupTransportHandler(Transport):
511
 
    """Test transport that works as a backup for the BadTransportHandler"""
512
 
    pass
513
 
 
514
 
 
515
 
class TestTransportImplementation(TestCaseInTempDir):
516
 
    """Implementation verification for transports.
517
 
    
518
 
    To verify a transport we need a server factory, which is a callable
519
 
    that accepts no parameters and returns an implementation of
520
 
    bzrlib.transport.Server.
521
 
    
522
 
    That Server is then used to construct transport instances and test
523
 
    the transport via loopback activity.
524
 
 
525
 
    Currently this assumes that the Transport object is connected to the 
526
 
    current working directory.  So that whatever is done 
527
 
    through the transport, should show up in the working 
528
 
    directory, and vice-versa. This is a bug, because its possible to have
529
 
    URL schemes which provide access to something that may not be 
530
 
    result in storage on the local disk, i.e. due to file system limits, or 
531
 
    due to it being a database or some other non-filesystem tool.
532
 
 
533
 
    This also tests to make sure that the functions work with both
534
 
    generators and lists (assuming iter(list) is effectively a generator)
535
 
    """
536
 
    
537
 
    def setUp(self):
538
 
        super(TestTransportImplementation, self).setUp()
539
 
        self._server = self.transport_server()
540
 
        self._server.setUp()
541
 
        self.addCleanup(self._server.tearDown)
542
 
 
543
 
    def get_transport(self):
544
 
        """Return a connected transport to the local directory."""
545
 
        base_url = self._server.get_url()
546
 
        # try getting the transport via the regular interface:
547
 
        t = get_transport(base_url)
548
 
        if not isinstance(t, self.transport_class):
549
 
            # we did not get the correct transport class type. Override the
550
 
            # regular connection behaviour by direct construction.
551
 
            t = self.transport_class(base_url)
552
 
        return t
553
 
 
554
 
 
555
 
class TestLocalTransports(TestCase):
556
 
 
557
 
    def test_get_transport_from_abspath(self):
558
 
        here = os.path.abspath('.')
559
 
        t = get_transport(here)
560
 
        self.assertIsInstance(t, LocalTransport)
561
 
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')
562
 
 
563
 
    def test_get_transport_from_relpath(self):
564
 
        here = os.path.abspath('.')
565
 
        t = get_transport('.')
566
 
        self.assertIsInstance(t, LocalTransport)
567
 
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')
568
 
 
569
 
    def test_get_transport_from_local_url(self):
570
 
        here = os.path.abspath('.')
571
 
        here_url = urlutils.local_path_to_url(here) + '/'
572
 
        t = get_transport(here_url)
573
 
        self.assertIsInstance(t, LocalTransport)
574
 
        self.assertEquals(t.base, here_url)
575
 
 
576
 
    def test_local_abspath(self):
577
 
        here = os.path.abspath('.')
578
 
        t = get_transport(here)
579
 
        self.assertEquals(t.local_abspath(''), here)
580
 
 
581
 
 
582
 
class TestWin32LocalTransport(TestCase):
583
 
 
584
 
    def test_unc_clone_to_root(self):
585
 
        # Win32 UNC path like \\HOST\path
586
 
        # clone to root should stop at least at \\HOST part
587
 
        # not on \\
588
 
        t = EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
589
 
        for i in xrange(4):
590
 
            t = t.clone('..')
591
 
        self.assertEquals(t.base, 'file://HOST/')
592
 
        # make sure we reach the root
593
 
        t = t.clone('..')
594
 
        self.assertEquals(t.base, 'file://HOST/')
595
 
 
596
 
 
597
 
def get_test_permutations():
598
 
    """Return transport permutations to be used in testing.
599
 
 
600
 
    This module registers some transports, but they're only for testing
601
 
    registration.  We don't really want to run all the transport tests against
602
 
    them.
603
 
    """
604
 
    return []