~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Martin Pool
  • Date: 2010-01-29 10:36:23 UTC
  • mto: This revision was merged to the branch mainline in revision 4992.
  • Revision ID: mbp@sourcefrog.net-20100129103623-hywka5hymo5z13jw
Change url to canonical.com or wiki, plus some doc improvements in passing

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 Canonical Ltd
 
1
# Copyright (C) 2004, 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
25
26
from StringIO import StringIO as pyStringIO
26
27
import stat
27
28
import sys
 
29
import unittest
28
30
 
29
31
from bzrlib import (
30
32
    errors,
31
33
    osutils,
 
34
    tests,
32
35
    urlutils,
33
36
    )
34
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
35
 
                           LockError, NoSmartServer, PathError,
36
 
                           TransportNotPossible, ConnectionError,
37
 
                           InvalidURL)
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           DirectoryNotEmpty,
 
39
                           FileExists,
 
40
                           InvalidURL,
 
41
                           LockError,
 
42
                           NoSuchFile,
 
43
                           NotLocalUrl,
 
44
                           PathError,
 
45
                           TransportNotPossible,
 
46
                           )
38
47
from bzrlib.osutils import getcwd
39
48
from bzrlib.smart import medium
40
 
from bzrlib.symbol_versioning import zero_eleven
41
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
 
49
from bzrlib.tests import (
 
50
    TestCaseInTempDir,
 
51
    TestSkipped,
 
52
    TestNotApplicable,
 
53
    multiply_tests,
 
54
    )
42
55
from bzrlib.tests.test_transport import TestTransportImplementation
43
 
from bzrlib.transport import memory, remote
44
 
import bzrlib.transport
 
56
from bzrlib.transport import (
 
57
    ConnectedTransport,
 
58
    get_transport,
 
59
    _get_transport_modules,
 
60
    )
 
61
from bzrlib.transport.memory import MemoryTransport
 
62
 
 
63
 
 
64
def get_transport_test_permutations(module):
 
65
    """Get the permutations module wants to have tested."""
 
66
    if getattr(module, 'get_test_permutations', None) is None:
 
67
        raise AssertionError(
 
68
            "transport module %s doesn't provide get_test_permutations()"
 
69
            % module.__name__)
 
70
        return []
 
71
    return module.get_test_permutations()
 
72
 
 
73
 
 
74
def transport_test_permutations():
 
75
    """Return a list of the klass, server_factory pairs to test."""
 
76
    result = []
 
77
    for module in _get_transport_modules():
 
78
        try:
 
79
            permutations = get_transport_test_permutations(
 
80
                reduce(getattr, (module).split('.')[1:], __import__(module)))
 
81
            for (klass, server_factory) in permutations:
 
82
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
83
                    {"transport_class":klass,
 
84
                     "transport_server":server_factory})
 
85
                result.append(scenario)
 
86
        except errors.DependencyNotPresent, e:
 
87
            # Continue even if a dependency prevents us
 
88
            # from adding this test
 
89
            pass
 
90
    return result
 
91
 
 
92
 
 
93
def load_tests(standard_tests, module, loader):
 
94
    """Multiply tests for tranport implementations."""
 
95
    result = loader.suiteClass()
 
96
    scenarios = transport_test_permutations()
 
97
    return multiply_tests(standard_tests, scenarios, result)
45
98
 
46
99
 
47
100
class TransportTests(TestTransportImplementation):
54
107
        """Check that transport.get(relpath).read() == content."""
55
108
        self.assertEqualDiff(content, transport.get(relpath).read())
56
109
 
 
110
    def test_ensure_base_missing(self):
 
111
        """.ensure_base() should create the directory if it doesn't exist"""
 
112
        t = self.get_transport()
 
113
        t_a = t.clone('a')
 
114
        if t_a.is_readonly():
 
115
            self.assertRaises(TransportNotPossible,
 
116
                              t_a.ensure_base)
 
117
            return
 
118
        self.assertTrue(t_a.ensure_base())
 
119
        self.assertTrue(t.has('a'))
 
120
 
 
121
    def test_ensure_base_exists(self):
 
122
        """.ensure_base() should just be happy if it already exists"""
 
123
        t = self.get_transport()
 
124
        if t.is_readonly():
 
125
            return
 
126
 
 
127
        t.mkdir('a')
 
128
        t_a = t.clone('a')
 
129
        # ensure_base returns False if it didn't create the base
 
130
        self.assertFalse(t_a.ensure_base())
 
131
 
 
132
    def test_ensure_base_missing_parent(self):
 
133
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
134
        t = self.get_transport()
 
135
        if t.is_readonly():
 
136
            return
 
137
 
 
138
        t_a = t.clone('a')
 
139
        t_b = t_a.clone('b')
 
140
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
141
 
 
142
    def test_external_url(self):
 
143
        """.external_url either works or raises InProcessTransport."""
 
144
        t = self.get_transport()
 
145
        try:
 
146
            t.external_url()
 
147
        except errors.InProcessTransport:
 
148
            pass
 
149
 
57
150
    def test_has(self):
58
151
        t = self.get_transport()
59
152
 
62
155
        self.assertEqual(True, t.has('a'))
63
156
        self.assertEqual(False, t.has('c'))
64
157
        self.assertEqual(True, t.has(urlutils.escape('%')))
65
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
66
 
                [True, True, False, False, True, False, True, False])
 
158
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
159
                                           'e', 'f', 'g', 'h'])),
 
160
                         [True, True, False, False,
 
161
                          True, False, True, False])
67
162
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
68
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
69
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
70
 
                [True, True, False, False, True, False, True, False])
 
163
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
164
                                           urlutils.escape('%%')]))
 
165
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
166
                                                'e', 'f', 'g', 'h']))),
 
167
                         [True, True, False, False,
 
168
                          True, False, True, False])
71
169
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
72
170
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
73
171
 
74
172
    def test_has_root_works(self):
 
173
        from bzrlib.smart import server
 
174
        if self.transport_server is server.SmartTCPServer_for_testing:
 
175
            raise TestNotApplicable(
 
176
                "SmartTCPServer_for_testing intentionally does not allow "
 
177
                "access to /.")
75
178
        current_transport = self.get_transport()
76
179
        self.assertTrue(current_transport.has('/'))
77
180
        root = current_transport.clone('/')
89
192
        self.build_tree(files, transport=t, line_endings='binary')
90
193
        self.check_transport_contents('contents of a\n', t, 'a')
91
194
        content_f = t.get_multi(files)
92
 
        for content, f in zip(contents, content_f):
 
195
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
196
        # evaluate their inputs, the transport requests should be issued and
 
197
        # handled sequentially (we don't want to force transport to buffer).
 
198
        for content, f in itertools.izip(contents, content_f):
93
199
            self.assertEqual(content, f.read())
94
200
 
95
201
        content_f = t.get_multi(iter(files))
96
 
        for content, f in zip(contents, content_f):
 
202
        # Use itertools.izip() for the same reason
 
203
        for content, f in itertools.izip(contents, content_f):
97
204
            self.assertEqual(content, f.read())
98
205
 
 
206
    def test_get_unknown_file(self):
 
207
        t = self.get_transport()
 
208
        files = ['a', 'b']
 
209
        contents = ['contents of a\n',
 
210
                    'contents of b\n',
 
211
                    ]
 
212
        self.build_tree(files, transport=t, line_endings='binary')
99
213
        self.assertRaises(NoSuchFile, t.get, 'c')
100
214
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
101
215
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
102
216
 
 
217
    def test_get_directory_read_gives_ReadError(self):
 
218
        """consistent errors for read() on a file returned by get()."""
 
219
        t = self.get_transport()
 
220
        if t.is_readonly():
 
221
            self.build_tree(['a directory/'])
 
222
        else:
 
223
            t.mkdir('a%20directory')
 
224
        # getting the file must either work or fail with a PathError
 
225
        try:
 
226
            a_file = t.get('a%20directory')
 
227
        except (errors.PathError, errors.RedirectRequested):
 
228
            # early failure return immediately.
 
229
            return
 
230
        # having got a file, read() must either work (i.e. http reading a dir
 
231
        # listing) or fail with ReadError
 
232
        try:
 
233
            a_file.read()
 
234
        except errors.ReadError:
 
235
            pass
 
236
 
103
237
    def test_get_bytes(self):
104
238
        t = self.get_transport()
105
239
 
115
249
        for content, fname in zip(contents, files):
116
250
            self.assertEqual(content, t.get_bytes(fname))
117
251
 
 
252
    def test_get_bytes_unknown_file(self):
 
253
        t = self.get_transport()
 
254
 
118
255
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
119
256
 
120
 
    def test_put(self):
121
 
        t = self.get_transport()
122
 
 
123
 
        if t.is_readonly():
124
 
            return
125
 
 
126
 
        self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
127
 
        self.check_transport_contents('string\ncontents\n', t, 'a')
128
 
 
129
 
        self.applyDeprecated(zero_eleven,
130
 
                             t.put, 'b', StringIO('file-like\ncontents\n'))
131
 
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
132
 
 
133
 
        self.assertRaises(NoSuchFile,
134
 
            self.applyDeprecated,
135
 
            zero_eleven,
136
 
            t.put, 'path/doesnt/exist/c', StringIO('contents'))
 
257
    def test_get_with_open_write_stream_sees_all_content(self):
 
258
        t = self.get_transport()
 
259
        if t.is_readonly():
 
260
            return
 
261
        handle = t.open_write_stream('foo')
 
262
        try:
 
263
            handle.write('b')
 
264
            self.assertEqual('b', t.get('foo').read())
 
265
        finally:
 
266
            handle.close()
 
267
 
 
268
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
269
        t = self.get_transport()
 
270
        if t.is_readonly():
 
271
            return
 
272
        handle = t.open_write_stream('foo')
 
273
        try:
 
274
            handle.write('b')
 
275
            self.assertEqual('b', t.get_bytes('foo'))
 
276
            self.assertEqual('b', t.get('foo').read())
 
277
        finally:
 
278
            handle.close()
137
279
 
138
280
    def test_put_bytes(self):
139
281
        t = self.get_transport()
186
328
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
187
329
                               create_parent_dir=True)
188
330
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
189
 
        
 
331
 
190
332
        # But we still get NoSuchFile if we can't make the parent dir
191
333
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
192
334
                                       'contents\n',
214
356
        umask = osutils.get_umask()
215
357
        t.put_bytes('nomode', 'test text\n', mode=None)
216
358
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
217
 
        
 
359
 
218
360
    def test_put_bytes_non_atomic_permissions(self):
219
361
        t = self.get_transport()
220
362
 
248
390
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
249
391
                               dir_mode=0777, create_parent_dir=True)
250
392
        self.assertTransportMode(t, 'dir777', 0777)
251
 
        
 
393
 
252
394
    def test_put_file(self):
253
395
        t = self.get_transport()
254
396
 
257
399
                    t.put_file, 'a', StringIO('some text for a\n'))
258
400
            return
259
401
 
260
 
        t.put_file('a', StringIO('some text for a\n'))
 
402
        result = t.put_file('a', StringIO('some text for a\n'))
 
403
        # put_file returns the length of the data written
 
404
        self.assertEqual(16, result)
261
405
        self.failUnless(t.has('a'))
262
406
        self.check_transport_contents('some text for a\n', t, 'a')
263
407
        # Put also replaces contents
264
 
        t.put_file('a', StringIO('new\ncontents for\na\n'))
 
408
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
409
        self.assertEqual(19, result)
265
410
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
266
411
        self.assertRaises(NoSuchFile,
267
412
                          t.put_file, 'path/doesnt/exist/c',
299
444
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
300
445
                              create_parent_dir=True)
301
446
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
302
 
        
 
447
 
303
448
        # But we still get NoSuchFile if we can't make the parent dir
304
449
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
305
450
                                       StringIO('contents\n'),
323
468
        # Yes, you can put a file such that it becomes readonly
324
469
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
325
470
        self.assertTransportMode(t, 'mode400', 0400)
326
 
 
327
 
        # XXX: put_multi is deprecated, so do we really care anymore?
328
 
        self.applyDeprecated(zero_eleven, t.put_multi,
329
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
330
 
        self.assertTransportMode(t, 'mmode644', 0644)
331
 
 
332
471
        # The default permissions should be based on the current umask
333
472
        umask = osutils.get_umask()
334
473
        t.put_file('nomode', StringIO('test text\n'), mode=None)
335
474
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
336
 
        
 
475
 
337
476
    def test_put_file_non_atomic_permissions(self):
338
477
        t = self.get_transport()
339
478
 
356
495
        umask = osutils.get_umask()
357
496
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
358
497
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
359
 
        
 
498
 
360
499
        # We should also be able to set the mode for a parent directory
361
500
        # when it is created
362
501
        sio = StringIO()
398
537
        unicode_file = pyStringIO(u'\u1234')
399
538
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
400
539
 
401
 
    def test_put_multi(self):
402
 
        t = self.get_transport()
403
 
 
404
 
        if t.is_readonly():
405
 
            return
406
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
407
 
            t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
408
 
                          ('d', StringIO('contents\nfor d\n'))]
409
 
            ))
410
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
411
 
                [True, False, False, True])
412
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
413
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
414
 
 
415
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
416
 
            t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
417
 
                              ('d', StringIO('another contents\nfor d\n'))])
418
 
            ))
419
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
420
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
421
 
 
422
 
    def test_put_permissions(self):
423
 
        t = self.get_transport()
424
 
 
425
 
        if t.is_readonly():
426
 
            return
427
 
        if not t._can_roundtrip_unix_modebits():
428
 
            # Can't roundtrip, so no need to run this test
429
 
            return
430
 
        self.applyDeprecated(zero_eleven, t.put, 'mode644',
431
 
                             StringIO('test text\n'), mode=0644)
432
 
        self.assertTransportMode(t, 'mode644', 0644)
433
 
        self.applyDeprecated(zero_eleven, t.put, 'mode666',
434
 
                             StringIO('test text\n'), mode=0666)
435
 
        self.assertTransportMode(t, 'mode666', 0666)
436
 
        self.applyDeprecated(zero_eleven, t.put, 'mode600',
437
 
                             StringIO('test text\n'), mode=0600)
438
 
        self.assertTransportMode(t, 'mode600', 0600)
439
 
        # Yes, you can put a file such that it becomes readonly
440
 
        self.applyDeprecated(zero_eleven, t.put, 'mode400',
441
 
                             StringIO('test text\n'), mode=0400)
442
 
        self.assertTransportMode(t, 'mode400', 0400)
443
 
        self.applyDeprecated(zero_eleven, t.put_multi,
444
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
445
 
        self.assertTransportMode(t, 'mmode644', 0644)
446
 
 
447
 
        # The default permissions should be based on the current umask
448
 
        umask = osutils.get_umask()
449
 
        self.applyDeprecated(zero_eleven, t.put, 'nomode',
450
 
                             StringIO('test text\n'), mode=None)
451
 
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
452
 
        
453
540
    def test_mkdir(self):
454
541
        t = self.get_transport()
455
542
 
456
543
        if t.is_readonly():
457
 
            # cannot mkdir on readonly transports. We're not testing for 
 
544
            # cannot mkdir on readonly transports. We're not testing for
458
545
            # cache coherency because cache behaviour is not currently
459
546
            # defined for the transport interface.
460
547
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
481
568
 
482
569
        # we were testing that a local mkdir followed by a transport
483
570
        # mkdir failed thusly, but given that we * in one process * do not
484
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
571
        # concurrently fiddle with disk dirs and then use transport to do
485
572
        # things, the win here seems marginal compared to the constraint on
486
573
        # the interface. RBC 20051227
487
574
        t.mkdir('dir_g')
520
607
        t.mkdir('dnomode', mode=None)
521
608
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
522
609
 
 
610
    def test_opening_a_file_stream_creates_file(self):
 
611
        t = self.get_transport()
 
612
        if t.is_readonly():
 
613
            return
 
614
        handle = t.open_write_stream('foo')
 
615
        try:
 
616
            self.assertEqual('', t.get_bytes('foo'))
 
617
        finally:
 
618
            handle.close()
 
619
 
 
620
    def test_opening_a_file_stream_can_set_mode(self):
 
621
        t = self.get_transport()
 
622
        if t.is_readonly():
 
623
            return
 
624
        if not t._can_roundtrip_unix_modebits():
 
625
            # Can't roundtrip, so no need to run this test
 
626
            return
 
627
        def check_mode(name, mode, expected):
 
628
            handle = t.open_write_stream(name, mode=mode)
 
629
            handle.close()
 
630
            self.assertTransportMode(t, name, expected)
 
631
        check_mode('mode644', 0644, 0644)
 
632
        check_mode('mode666', 0666, 0666)
 
633
        check_mode('mode600', 0600, 0600)
 
634
        # The default permissions should be based on the current umask
 
635
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
636
 
523
637
    def test_copy_to(self):
524
638
        # FIXME: test:   same server to same server (partly done)
525
639
        # same protocol two servers
526
640
        # and    different protocols (done for now except for MemoryTransport.
527
641
        # - RBC 20060122
528
 
        from bzrlib.transport.memory import MemoryTransport
529
642
 
530
643
        def simple_copy_files(transport_from, transport_to):
531
644
            files = ['a', 'b', 'c', 'd']
571
684
            for f in files:
572
685
                self.assertTransportMode(temp_transport, f, mode)
573
686
 
574
 
    def test_append(self):
 
687
    def test_create_prefix(self):
575
688
        t = self.get_transport()
576
 
 
577
 
        if t.is_readonly():
578
 
            return
579
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
580
 
        t.put_bytes('b', 'contents\nfor b\n')
581
 
 
582
 
        self.assertEqual(20, self.applyDeprecated(zero_eleven,
583
 
            t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
584
 
 
585
 
        self.check_transport_contents(
586
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
587
 
            t, 'a')
588
 
 
589
 
        # And we can create new files, too
590
 
        self.assertEqual(0, self.applyDeprecated(zero_eleven,
591
 
            t.append, 'c', StringIO('some text\nfor a missing file\n')))
592
 
        self.check_transport_contents('some text\nfor a missing file\n',
593
 
                                      t, 'c')
 
689
        sub = t.clone('foo').clone('bar')
 
690
        try:
 
691
            sub.create_prefix()
 
692
        except TransportNotPossible:
 
693
            self.assertTrue(t.is_readonly())
 
694
        else:
 
695
            self.assertTrue(t.has('foo/bar'))
 
696
 
594
697
    def test_append_file(self):
595
698
        t = self.get_transport()
596
699
 
700
803
                t.append_file, 'f', StringIO('f'), mode=None)
701
804
            return
702
805
        t.append_file('f', StringIO('f'), mode=None)
703
 
        
 
806
 
704
807
    def test_append_bytes_mode(self):
705
808
        # check append_bytes accepts a mode
706
809
        t = self.get_transport()
709
812
                t.append_bytes, 'f', 'f', mode=None)
710
813
            return
711
814
        t.append_bytes('f', 'f', mode=None)
712
 
        
 
815
 
713
816
    def test_delete(self):
714
817
        # TODO: Test Transport.delete
715
818
        t = self.get_transport()
754
857
        # plain "listdir".
755
858
        # self.assertEqual([], os.listdir('.'))
756
859
 
 
860
    def test_recommended_page_size(self):
 
861
        """Transports recommend a page size for partial access to files."""
 
862
        t = self.get_transport()
 
863
        self.assertIsInstance(t.recommended_page_size(), int)
 
864
 
757
865
    def test_rmdir(self):
758
866
        t = self.get_transport()
759
867
        # Not much to do with a readonly transport
771
879
 
772
880
    def test_rmdir_not_empty(self):
773
881
        """Deleting a non-empty directory raises an exception
774
 
        
 
882
 
775
883
        sftp (and possibly others) don't give us a specific "directory not
776
884
        empty" exception -- we can just see that the operation failed.
777
885
        """
784
892
 
785
893
    def test_rmdir_empty_but_similar_prefix(self):
786
894
        """rmdir does not get confused by sibling paths.
787
 
        
 
895
 
788
896
        A naive implementation of MemoryTransport would refuse to rmdir
789
897
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
790
898
        uses "path.startswith(dir)" on all file paths to determine if directory
827
935
        self.assertFalse(t.has('adir/bdir'))
828
936
        self.assertFalse(t.has('adir/bsubdir'))
829
937
 
 
938
    def test_rename_across_subdirs(self):
 
939
        t = self.get_transport()
 
940
        if t.is_readonly():
 
941
            raise TestNotApplicable("transport is readonly")
 
942
        t.mkdir('a')
 
943
        t.mkdir('b')
 
944
        ta = t.clone('a')
 
945
        tb = t.clone('b')
 
946
        ta.put_bytes('f', 'aoeu')
 
947
        ta.rename('f', '../b/f')
 
948
        self.assertTrue(tb.has('f'))
 
949
        self.assertFalse(ta.has('f'))
 
950
        self.assertTrue(t.has('b/f'))
 
951
 
830
952
    def test_delete_tree(self):
831
953
        t = self.get_transport()
832
954
 
842
964
        except TransportNotPossible:
843
965
            # ok, this transport does not support delete_tree
844
966
            return
845
 
        
 
967
 
846
968
        # did it delete that trivial case?
847
969
        self.assertRaises(NoSuchFile, t.stat, 'adir')
848
970
 
849
971
        self.build_tree(['adir/',
850
 
                         'adir/file', 
851
 
                         'adir/subdir/', 
852
 
                         'adir/subdir/file', 
 
972
                         'adir/file',
 
973
                         'adir/subdir/',
 
974
                         'adir/subdir/file',
853
975
                         'adir/subdir2/',
854
976
                         'adir/subdir2/file',
855
977
                         ], transport=t)
886
1008
        self.check_transport_contents('c this file\n', t, 'b')
887
1009
 
888
1010
        # TODO: Try to write a test for atomicity
889
 
        # TODO: Test moving into a non-existant subdirectory
 
1011
        # TODO: Test moving into a non-existent subdirectory
890
1012
        # TODO: Test Transport.move_multi
891
1013
 
892
1014
    def test_copy(self):
912
1034
 
913
1035
    def test_connection_error(self):
914
1036
        """ConnectionError is raised when connection is impossible.
915
 
        
916
 
        The error may be raised from either the constructor or the first
917
 
        operation on the transport.
 
1037
 
 
1038
        The error should be raised from the first operation on the transport.
918
1039
        """
919
1040
        try:
920
1041
            url = self._server.get_bogus_url()
921
1042
        except NotImplementedError:
922
1043
            raise TestSkipped("Transport %s has no bogus URL support." %
923
1044
                              self._server.__class__)
924
 
        # This should be:  but SSH still connects on construction. No COOKIE!
925
 
        # self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
926
 
        try:
927
 
            t = bzrlib.transport.get_transport(url)
928
 
            t.get('.bzr/branch')
929
 
        except (ConnectionError, NoSuchFile), e:
930
 
            pass
931
 
        except (Exception), e:
932
 
            self.fail('Wrong exception thrown (%s.%s): %s' 
933
 
                        % (e.__class__.__module__, e.__class__.__name__, e))
934
 
        else:
935
 
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
 
1045
        t = get_transport(url)
 
1046
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
936
1047
 
937
1048
    def test_stat(self):
938
1049
        # TODO: Test stat, just try once, and if it throws, stop testing
947
1058
            return
948
1059
 
949
1060
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
950
 
        sizes = [14, 0, 16, 0, 18] 
 
1061
        sizes = [14, 0, 16, 0, 18]
951
1062
        self.build_tree(paths, transport=t, line_endings='binary')
952
1063
 
953
1064
        for path, size in zip(paths, sizes):
975
1086
    def test_list_dir(self):
976
1087
        # TODO: Test list_dir, just try once, and if it throws, stop testing
977
1088
        t = self.get_transport()
978
 
        
 
1089
 
979
1090
        if not t.listable():
980
1091
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
981
1092
            return
982
1093
 
983
 
        def sorted_list(d):
984
 
            l = list(t.list_dir(d))
 
1094
        def sorted_list(d, transport):
 
1095
            l = list(transport.list_dir(d))
985
1096
            l.sort()
986
1097
            return l
987
1098
 
988
 
        self.assertEqual([], sorted_list('.'))
 
1099
        self.assertEqual([], sorted_list('.', t))
989
1100
        # c2 is precisely one letter longer than c here to test that
990
1101
        # suffixing is not confused.
991
1102
        # a%25b checks that quoting is done consistently across transports
997
1108
            self.build_tree(tree_names)
998
1109
 
999
1110
        self.assertEqual(
1000
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.'))
1001
 
        self.assertEqual(['d', 'e'], sorted_list('c'))
 
1111
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1112
        self.assertEqual(
 
1113
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1114
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1115
 
 
1116
        # Cloning the transport produces an equivalent listing
 
1117
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
1002
1118
 
1003
1119
        if not t.is_readonly():
1004
1120
            t.delete('c/d')
1006
1122
        else:
1007
1123
            os.unlink('c/d')
1008
1124
            os.unlink('b')
1009
 
            
1010
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.'))
1011
 
        self.assertEqual(['e'], sorted_list('c'))
 
1125
 
 
1126
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1127
        self.assertEqual(['e'], sorted_list('c', t))
1012
1128
 
1013
1129
        self.assertListRaises(PathError, t.list_dir, 'q')
1014
1130
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1131
        # 'a' is a file, list_dir should raise an error
1015
1132
        self.assertListRaises(PathError, t.list_dir, 'a')
1016
1133
 
1017
1134
    def test_list_dir_result_is_url_escaped(self):
1023
1140
            self.build_tree(['a/', 'a/%'], transport=t)
1024
1141
        else:
1025
1142
            self.build_tree(['a/', 'a/%'])
1026
 
        
 
1143
 
1027
1144
        names = list(t.list_dir('a'))
1028
1145
        self.assertEqual(['%25'], names)
1029
1146
        self.assertIsInstance(names[0], str)
1030
1147
 
 
1148
    def test_clone_preserve_info(self):
 
1149
        t1 = self.get_transport()
 
1150
        if not isinstance(t1, ConnectedTransport):
 
1151
            raise TestSkipped("not a connected transport")
 
1152
 
 
1153
        t2 = t1.clone('subdir')
 
1154
        self.assertEquals(t1._scheme, t2._scheme)
 
1155
        self.assertEquals(t1._user, t2._user)
 
1156
        self.assertEquals(t1._password, t2._password)
 
1157
        self.assertEquals(t1._host, t2._host)
 
1158
        self.assertEquals(t1._port, t2._port)
 
1159
 
 
1160
    def test__reuse_for(self):
 
1161
        t = self.get_transport()
 
1162
        if not isinstance(t, ConnectedTransport):
 
1163
            raise TestSkipped("not a connected transport")
 
1164
 
 
1165
        def new_url(scheme=None, user=None, password=None,
 
1166
                    host=None, port=None, path=None):
 
1167
            """Build a new url from t.base changing only parts of it.
 
1168
 
 
1169
            Only the parameters different from None will be changed.
 
1170
            """
 
1171
            if scheme   is None: scheme   = t._scheme
 
1172
            if user     is None: user     = t._user
 
1173
            if password is None: password = t._password
 
1174
            if user     is None: user     = t._user
 
1175
            if host     is None: host     = t._host
 
1176
            if port     is None: port     = t._port
 
1177
            if path     is None: path     = t._path
 
1178
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1179
 
 
1180
        if t._scheme == 'ftp':
 
1181
            scheme = 'sftp'
 
1182
        else:
 
1183
            scheme = 'ftp'
 
1184
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1185
        if t._user == 'me':
 
1186
            user = 'you'
 
1187
        else:
 
1188
            user = 'me'
 
1189
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1190
        # passwords are not taken into account because:
 
1191
        # - it makes no sense to have two different valid passwords for the
 
1192
        #   same user
 
1193
        # - _password in ConnectedTransport is intended to collect what the
 
1194
        #   user specified from the command-line and there are cases where the
 
1195
        #   new url can contain no password (if the url was built from an
 
1196
        #   existing transport.base for example)
 
1197
        # - password are considered part of the credentials provided at
 
1198
        #   connection creation time and as such may not be present in the url
 
1199
        #   (they may be typed by the user when prompted for example)
 
1200
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1201
        # We will not connect, we can use a invalid host
 
1202
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1203
        if t._port == 1234:
 
1204
            port = 4321
 
1205
        else:
 
1206
            port = 1234
 
1207
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1208
        # No point in trying to reuse a transport for a local URL
 
1209
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1210
 
 
1211
    def test_connection_sharing(self):
 
1212
        t = self.get_transport()
 
1213
        if not isinstance(t, ConnectedTransport):
 
1214
            raise TestSkipped("not a connected transport")
 
1215
 
 
1216
        c = t.clone('subdir')
 
1217
        # Some transports will create the connection  only when needed
 
1218
        t.has('surely_not') # Force connection
 
1219
        self.assertIs(t._get_connection(), c._get_connection())
 
1220
 
 
1221
        # Temporary failure, we need to create a new dummy connection
 
1222
        new_connection = object()
 
1223
        t._set_connection(new_connection)
 
1224
        # Check that both transports use the same connection
 
1225
        self.assertIs(new_connection, t._get_connection())
 
1226
        self.assertIs(new_connection, c._get_connection())
 
1227
 
 
1228
    def test_reuse_connection_for_various_paths(self):
 
1229
        t = self.get_transport()
 
1230
        if not isinstance(t, ConnectedTransport):
 
1231
            raise TestSkipped("not a connected transport")
 
1232
 
 
1233
        t.has('surely_not') # Force connection
 
1234
        self.assertIsNot(None, t._get_connection())
 
1235
 
 
1236
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1237
        self.assertIsNot(t, subdir)
 
1238
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1239
 
 
1240
        home = subdir._reuse_for(t.base + 'home')
 
1241
        self.assertIs(t._get_connection(), home._get_connection())
 
1242
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1243
 
1031
1244
    def test_clone(self):
1032
1245
        # TODO: Test that clone moves up and down the filesystem
1033
1246
        t1 = self.get_transport()
1053
1266
        self.failIf(t3.has('b/d'))
1054
1267
 
1055
1268
        if t1.is_readonly():
1056
 
            open('b/d', 'wb').write('newfile\n')
 
1269
            self.build_tree_contents([('b/d', 'newfile\n')])
1057
1270
        else:
1058
1271
            t2.put_bytes('d', 'newfile\n')
1059
1272
 
1104
1317
        self.assertEqual('', t.relpath(t.base))
1105
1318
        # base ends with /
1106
1319
        self.assertEqual('', t.relpath(t.base[:-1]))
1107
 
        # subdirs which dont exist should still give relpaths.
 
1320
        # subdirs which don't exist should still give relpaths.
1108
1321
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1109
1322
        # trailing slash should be the same.
1110
1323
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1126
1339
        # that have aliasing problems like symlinks should go in backend
1127
1340
        # specific test cases.
1128
1341
        transport = self.get_transport()
1129
 
        
 
1342
 
1130
1343
        self.assertEqual(transport.base + 'relpath',
1131
1344
                         transport.abspath('relpath'))
1132
1345
 
1139
1352
        self.assertEqual(transport.clone("/").abspath('foo'),
1140
1353
                         transport.abspath("/foo"))
1141
1354
 
 
1355
    def test_win32_abspath(self):
 
1356
        # Note: we tried to set sys.platform='win32' so we could test on
 
1357
        # other platforms too, but then osutils does platform specific
 
1358
        # things at import time which defeated us...
 
1359
        if sys.platform != 'win32':
 
1360
            raise TestSkipped(
 
1361
                'Testing drive letters in abspath implemented only for win32')
 
1362
 
 
1363
        # smoke test for abspath on win32.
 
1364
        # a transport based on 'file:///' never fully qualifies the drive.
 
1365
        transport = get_transport("file:///")
 
1366
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1367
 
 
1368
        # but a transport that starts with a drive spec must keep it.
 
1369
        transport = get_transport("file:///C:/")
 
1370
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1371
 
1142
1372
    def test_local_abspath(self):
1143
1373
        transport = self.get_transport()
1144
1374
        try:
1145
1375
            p = transport.local_abspath('.')
1146
 
        except TransportNotPossible:
1147
 
            pass # This is not a local transport
 
1376
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1377
            # should be formattable
 
1378
            s = str(e)
1148
1379
        else:
1149
1380
            self.assertEqual(getcwd(), p)
1150
1381
 
1220
1451
                         'to/dir/b%2525z',
1221
1452
                         'to/bar',]))
1222
1453
 
 
1454
    def test_copy_tree_to_transport(self):
 
1455
        transport = self.get_transport()
 
1456
        if not transport.listable():
 
1457
            self.assertRaises(TransportNotPossible,
 
1458
                              transport.iter_files_recursive)
 
1459
            return
 
1460
        if transport.is_readonly():
 
1461
            return
 
1462
        self.build_tree(['from/',
 
1463
                         'from/dir/',
 
1464
                         'from/dir/foo',
 
1465
                         'from/dir/bar',
 
1466
                         'from/dir/b%25z', # make sure quoting is correct
 
1467
                         'from/bar'],
 
1468
                        transport=transport)
 
1469
        from_transport = transport.clone('from')
 
1470
        to_transport = transport.clone('to')
 
1471
        to_transport.ensure_base()
 
1472
        from_transport.copy_tree_to_transport(to_transport)
 
1473
        paths = set(transport.iter_files_recursive())
 
1474
        self.assertEqual(paths,
 
1475
                    set(['from/dir/foo',
 
1476
                         'from/dir/bar',
 
1477
                         'from/dir/b%2525z',
 
1478
                         'from/bar',
 
1479
                         'to/dir/foo',
 
1480
                         'to/dir/bar',
 
1481
                         'to/dir/b%2525z',
 
1482
                         'to/bar',]))
 
1483
 
1223
1484
    def test_unicode_paths(self):
1224
1485
        """Test that we can read/write files with Unicode names."""
1225
1486
        t = self.get_transport()
1236
1497
                 u'\u65e5', # Kanji person
1237
1498
                ]
1238
1499
 
 
1500
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1501
        if no_unicode_support:
 
1502
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1503
 
1239
1504
        try:
1240
1505
            self.build_tree(files, transport=t, line_endings='binary')
1241
1506
        except UnicodeError:
1251
1516
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1252
1517
 
1253
1518
    def test_connect_twice_is_same_content(self):
1254
 
        # check that our server (whatever it is) is accessable reliably
 
1519
        # check that our server (whatever it is) is accessible reliably
1255
1520
        # via get_transport and multiple connections share content.
1256
1521
        transport = self.get_transport()
1257
1522
        if transport.is_readonly():
1258
1523
            return
1259
1524
        transport.put_bytes('foo', 'bar')
1260
 
        transport2 = self.get_transport()
1261
 
        self.check_transport_contents('bar', transport2, 'foo')
1262
 
        # its base should be usable.
1263
 
        transport2 = bzrlib.transport.get_transport(transport.base)
1264
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1525
        transport3 = self.get_transport()
 
1526
        self.check_transport_contents('bar', transport3, 'foo')
1265
1527
 
1266
1528
        # now opening at a relative url should give use a sane result:
1267
1529
        transport.mkdir('newdir')
1268
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1269
 
        transport2 = transport2.clone('..')
1270
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1530
        transport5 = self.get_transport('newdir')
 
1531
        transport6 = transport5.clone('..')
 
1532
        self.check_transport_contents('bar', transport6, 'foo')
1271
1533
 
1272
1534
    def test_lock_write(self):
1273
1535
        """Test transport-level write locks.
1334
1596
        self.assertEqual(d[2], (0, '0'))
1335
1597
        self.assertEqual(d[3], (3, '34'))
1336
1598
 
 
1599
    def test_readv_with_adjust_for_latency(self):
 
1600
        transport = self.get_transport()
 
1601
        # the adjust for latency flag expands the data region returned
 
1602
        # according to a per-transport heuristic, so testing is a little
 
1603
        # tricky as we need more data than the largest combining that our
 
1604
        # transports do. To accomodate this we generate random data and cross
 
1605
        # reference the returned data with the random data. To avoid doing
 
1606
        # multiple large random byte look ups we do several tests on the same
 
1607
        # backing data.
 
1608
        content = osutils.rand_bytes(200*1024)
 
1609
        content_size = len(content)
 
1610
        if transport.is_readonly():
 
1611
            self.build_tree_contents([('a', content)])
 
1612
        else:
 
1613
            transport.put_bytes('a', content)
 
1614
        def check_result_data(result_vector):
 
1615
            for item in result_vector:
 
1616
                data_len = len(item[1])
 
1617
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1618
 
 
1619
        # start corner case
 
1620
        result = list(transport.readv('a', ((0, 30),),
 
1621
            adjust_for_latency=True, upper_limit=content_size))
 
1622
        # we expect 1 result, from 0, to something > 30
 
1623
        self.assertEqual(1, len(result))
 
1624
        self.assertEqual(0, result[0][0])
 
1625
        self.assertTrue(len(result[0][1]) >= 30)
 
1626
        check_result_data(result)
 
1627
        # end of file corner case
 
1628
        result = list(transport.readv('a', ((204700, 100),),
 
1629
            adjust_for_latency=True, upper_limit=content_size))
 
1630
        # we expect 1 result, from 204800- its length, to the end
 
1631
        self.assertEqual(1, len(result))
 
1632
        data_len = len(result[0][1])
 
1633
        self.assertEqual(204800-data_len, result[0][0])
 
1634
        self.assertTrue(data_len >= 100)
 
1635
        check_result_data(result)
 
1636
        # out of order ranges are made in order
 
1637
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1638
            adjust_for_latency=True, upper_limit=content_size))
 
1639
        # we expect 2 results, in order, start and end.
 
1640
        self.assertEqual(2, len(result))
 
1641
        # start
 
1642
        data_len = len(result[0][1])
 
1643
        self.assertEqual(0, result[0][0])
 
1644
        self.assertTrue(data_len >= 30)
 
1645
        # end
 
1646
        data_len = len(result[1][1])
 
1647
        self.assertEqual(204800-data_len, result[1][0])
 
1648
        self.assertTrue(data_len >= 100)
 
1649
        check_result_data(result)
 
1650
        # close ranges get combined (even if out of order)
 
1651
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1652
            result = list(transport.readv('a', request_vector,
 
1653
                adjust_for_latency=True, upper_limit=content_size))
 
1654
            self.assertEqual(1, len(result))
 
1655
            data_len = len(result[0][1])
 
1656
            # minimum length is from 400 to 1034 - 634
 
1657
            self.assertTrue(data_len >= 634)
 
1658
            # must contain the region 400 to 1034
 
1659
            self.assertTrue(result[0][0] <= 400)
 
1660
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1661
            check_result_data(result)
 
1662
 
 
1663
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1664
        transport = self.get_transport()
 
1665
        # test from observed failure case.
 
1666
        if transport.is_readonly():
 
1667
            file('a', 'w').write('a'*1024*1024)
 
1668
        else:
 
1669
            transport.put_bytes('a', 'a'*1024*1024)
 
1670
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1671
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1672
            (465373, 800), (947422, 800)]
 
1673
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1674
        found_items = [False]*9
 
1675
        for pos, (start, length) in enumerate(broken_vector):
 
1676
            # check the range is covered by the result
 
1677
            for offset, data in results:
 
1678
                if offset <= start and start + length <= offset + len(data):
 
1679
                    found_items[pos] = True
 
1680
        self.assertEqual([True]*9, found_items)
 
1681
 
 
1682
    def test_get_with_open_write_stream_sees_all_content(self):
 
1683
        t = self.get_transport()
 
1684
        if t.is_readonly():
 
1685
            return
 
1686
        handle = t.open_write_stream('foo')
 
1687
        try:
 
1688
            handle.write('bcd')
 
1689
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1690
        finally:
 
1691
            handle.close()
 
1692
 
1337
1693
    def test_get_smart_medium(self):
1338
1694
        """All transports must either give a smart medium, or know they can't.
1339
1695
        """