~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-10-18 11:57:18 UTC
  • mfrom: (5505.1.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20101018115718-cbuoc2gafnjldngk
(vila) Document hunk editing when shelving.(Neil Martinsen-Burrell)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2004, 2005, 2006 by Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for Transport implementations.
18
18
 
20
20
TransportTestProviderAdapter.
21
21
"""
22
22
 
 
23
import itertools
23
24
import os
24
25
from cStringIO import StringIO
 
26
from StringIO import StringIO as pyStringIO
25
27
import stat
26
28
import sys
27
29
 
28
30
from bzrlib import (
 
31
    errors,
29
32
    osutils,
 
33
    pyutils,
 
34
    tests,
30
35
    urlutils,
31
36
    )
32
 
from bzrlib.errors import (DirectoryNotEmpty, NoSuchFile, FileExists,
33
 
                           LockError, PathError,
34
 
                           TransportNotPossible, ConnectionError,
35
 
                           InvalidURL)
 
37
from bzrlib.errors import (ConnectionError,
 
38
                           FileExists,
 
39
                           InvalidURL,
 
40
                           NoSuchFile,
 
41
                           PathError,
 
42
                           TransportNotPossible,
 
43
                           )
36
44
from bzrlib.osutils import getcwd
37
 
from bzrlib.symbol_versioning import zero_eleven
38
 
from bzrlib.tests import TestCaseInTempDir, TestSkipped
 
45
from bzrlib.smart import medium
 
46
from bzrlib.tests import (
 
47
    TestSkipped,
 
48
    TestNotApplicable,
 
49
    multiply_tests,
 
50
    )
 
51
from bzrlib.tests import test_server
39
52
from bzrlib.tests.test_transport import TestTransportImplementation
40
 
from bzrlib.transport import memory
41
 
import bzrlib.transport
42
 
 
43
 
 
44
 
def _append(fn, txt):
45
 
    """Append the given text (file-like object) to the supplied filename."""
46
 
    f = open(fn, 'ab')
47
 
    try:
48
 
        f.write(txt.read())
49
 
    finally:
50
 
        f.close()
 
53
from bzrlib.transport import (
 
54
    ConnectedTransport,
 
55
    get_transport,
 
56
    _get_transport_modules,
 
57
    )
 
58
from bzrlib.transport.memory import MemoryTransport
 
59
 
 
60
 
 
61
def get_transport_test_permutations(module):
 
62
    """Get the permutations module wants to have tested."""
 
63
    if getattr(module, 'get_test_permutations', None) is None:
 
64
        raise AssertionError(
 
65
            "transport module %s doesn't provide get_test_permutations()"
 
66
            % module.__name__)
 
67
        return []
 
68
    return module.get_test_permutations()
 
69
 
 
70
 
 
71
def transport_test_permutations():
 
72
    """Return a list of the klass, server_factory pairs to test."""
 
73
    result = []
 
74
    for module in _get_transport_modules():
 
75
        try:
 
76
            permutations = get_transport_test_permutations(
 
77
                pyutils.get_named_object(module))
 
78
            for (klass, server_factory) in permutations:
 
79
                scenario = ('%s,%s' % (klass.__name__, server_factory.__name__),
 
80
                    {"transport_class":klass,
 
81
                     "transport_server":server_factory})
 
82
                result.append(scenario)
 
83
        except errors.DependencyNotPresent, e:
 
84
            # Continue even if a dependency prevents us
 
85
            # from adding this test
 
86
            pass
 
87
    return result
 
88
 
 
89
 
 
90
def load_tests(standard_tests, module, loader):
 
91
    """Multiply tests for tranport implementations."""
 
92
    result = loader.suiteClass()
 
93
    scenarios = transport_test_permutations()
 
94
    return multiply_tests(standard_tests, scenarios, result)
51
95
 
52
96
 
53
97
class TransportTests(TestTransportImplementation):
54
98
 
 
99
    def setUp(self):
 
100
        super(TransportTests, self).setUp()
 
101
        self._captureVar('BZR_NO_SMART_VFS', None)
 
102
 
55
103
    def check_transport_contents(self, content, transport, relpath):
56
104
        """Check that transport.get(relpath).read() == content."""
57
105
        self.assertEqualDiff(content, transport.get(relpath).read())
58
106
 
59
 
    def assertListRaises(self, excClass, func, *args, **kwargs):
60
 
        """Fail unless excClass is raised when the iterator from func is used.
61
 
        
62
 
        Many transport functions can return generators this makes sure
63
 
        to wrap them in a list() call to make sure the whole generator
64
 
        is run, and that the proper exception is raised.
65
 
        """
 
107
    def test_ensure_base_missing(self):
 
108
        """.ensure_base() should create the directory if it doesn't exist"""
 
109
        t = self.get_transport()
 
110
        t_a = t.clone('a')
 
111
        if t_a.is_readonly():
 
112
            self.assertRaises(TransportNotPossible,
 
113
                              t_a.ensure_base)
 
114
            return
 
115
        self.assertTrue(t_a.ensure_base())
 
116
        self.assertTrue(t.has('a'))
 
117
 
 
118
    def test_ensure_base_exists(self):
 
119
        """.ensure_base() should just be happy if it already exists"""
 
120
        t = self.get_transport()
 
121
        if t.is_readonly():
 
122
            return
 
123
 
 
124
        t.mkdir('a')
 
125
        t_a = t.clone('a')
 
126
        # ensure_base returns False if it didn't create the base
 
127
        self.assertFalse(t_a.ensure_base())
 
128
 
 
129
    def test_ensure_base_missing_parent(self):
 
130
        """.ensure_base() will fail if the parent dir doesn't exist"""
 
131
        t = self.get_transport()
 
132
        if t.is_readonly():
 
133
            return
 
134
 
 
135
        t_a = t.clone('a')
 
136
        t_b = t_a.clone('b')
 
137
        self.assertRaises(NoSuchFile, t_b.ensure_base)
 
138
 
 
139
    def test_external_url(self):
 
140
        """.external_url either works or raises InProcessTransport."""
 
141
        t = self.get_transport()
66
142
        try:
67
 
            list(func(*args, **kwargs))
68
 
        except excClass:
69
 
            return
70
 
        else:
71
 
            if getattr(excClass,'__name__', None) is not None:
72
 
                excName = excClass.__name__
73
 
            else:
74
 
                excName = str(excClass)
75
 
            raise self.failureException, "%s not raised" % excName
 
143
            t.external_url()
 
144
        except errors.InProcessTransport:
 
145
            pass
76
146
 
77
147
    def test_has(self):
78
148
        t = self.get_transport()
82
152
        self.assertEqual(True, t.has('a'))
83
153
        self.assertEqual(False, t.has('c'))
84
154
        self.assertEqual(True, t.has(urlutils.escape('%')))
85
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'])),
86
 
                [True, True, False, False, True, False, True, False])
 
155
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd',
 
156
                                           'e', 'f', 'g', 'h'])),
 
157
                         [True, True, False, False,
 
158
                          True, False, True, False])
87
159
        self.assertEqual(True, t.has_any(['a', 'b', 'c']))
88
 
        self.assertEqual(False, t.has_any(['c', 'd', 'f', urlutils.escape('%%')]))
89
 
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']))),
90
 
                [True, True, False, False, True, False, True, False])
 
160
        self.assertEqual(False, t.has_any(['c', 'd', 'f',
 
161
                                           urlutils.escape('%%')]))
 
162
        self.assertEqual(list(t.has_multi(iter(['a', 'b', 'c', 'd',
 
163
                                                'e', 'f', 'g', 'h']))),
 
164
                         [True, True, False, False,
 
165
                          True, False, True, False])
91
166
        self.assertEqual(False, t.has_any(['c', 'c', 'c']))
92
167
        self.assertEqual(True, t.has_any(['b', 'b', 'b']))
93
168
 
 
169
    def test_has_root_works(self):
 
170
        if self.transport_server is test_server.SmartTCPServer_for_testing:
 
171
            raise TestNotApplicable(
 
172
                "SmartTCPServer_for_testing intentionally does not allow "
 
173
                "access to /.")
 
174
        current_transport = self.get_transport()
 
175
        self.assertTrue(current_transport.has('/'))
 
176
        root = current_transport.clone('/')
 
177
        self.assertTrue(root.has(''))
 
178
 
94
179
    def test_get(self):
95
180
        t = self.get_transport()
96
181
 
103
188
        self.build_tree(files, transport=t, line_endings='binary')
104
189
        self.check_transport_contents('contents of a\n', t, 'a')
105
190
        content_f = t.get_multi(files)
106
 
        for content, f in zip(contents, content_f):
 
191
        # Use itertools.izip() instead of use zip() or map(), since they fully
 
192
        # evaluate their inputs, the transport requests should be issued and
 
193
        # handled sequentially (we don't want to force transport to buffer).
 
194
        for content, f in itertools.izip(contents, content_f):
107
195
            self.assertEqual(content, f.read())
108
196
 
109
197
        content_f = t.get_multi(iter(files))
110
 
        for content, f in zip(contents, content_f):
 
198
        # Use itertools.izip() for the same reason
 
199
        for content, f in itertools.izip(contents, content_f):
111
200
            self.assertEqual(content, f.read())
112
201
 
 
202
    def test_get_unknown_file(self):
 
203
        t = self.get_transport()
 
204
        files = ['a', 'b']
 
205
        contents = ['contents of a\n',
 
206
                    'contents of b\n',
 
207
                    ]
 
208
        self.build_tree(files, transport=t, line_endings='binary')
113
209
        self.assertRaises(NoSuchFile, t.get, 'c')
114
210
        self.assertListRaises(NoSuchFile, t.get_multi, ['a', 'b', 'c'])
115
211
        self.assertListRaises(NoSuchFile, t.get_multi, iter(['a', 'b', 'c']))
116
212
 
 
213
    def test_get_directory_read_gives_ReadError(self):
 
214
        """consistent errors for read() on a file returned by get()."""
 
215
        t = self.get_transport()
 
216
        if t.is_readonly():
 
217
            self.build_tree(['a directory/'])
 
218
        else:
 
219
            t.mkdir('a%20directory')
 
220
        # getting the file must either work or fail with a PathError
 
221
        try:
 
222
            a_file = t.get('a%20directory')
 
223
        except (errors.PathError, errors.RedirectRequested):
 
224
            # early failure return immediately.
 
225
            return
 
226
        # having got a file, read() must either work (i.e. http reading a dir
 
227
        # listing) or fail with ReadError
 
228
        try:
 
229
            a_file.read()
 
230
        except errors.ReadError:
 
231
            pass
 
232
 
117
233
    def test_get_bytes(self):
118
234
        t = self.get_transport()
119
235
 
129
245
        for content, fname in zip(contents, files):
130
246
            self.assertEqual(content, t.get_bytes(fname))
131
247
 
 
248
    def test_get_bytes_unknown_file(self):
 
249
        t = self.get_transport()
132
250
        self.assertRaises(NoSuchFile, t.get_bytes, 'c')
133
251
 
134
 
    def test_put(self):
135
 
        t = self.get_transport()
136
 
 
137
 
        if t.is_readonly():
138
 
            return
139
 
 
140
 
        self.applyDeprecated(zero_eleven, t.put, 'a', 'string\ncontents\n')
141
 
        self.check_transport_contents('string\ncontents\n', t, 'a')
142
 
 
143
 
        self.applyDeprecated(zero_eleven,
144
 
                             t.put, 'b', StringIO('file-like\ncontents\n'))
145
 
        self.check_transport_contents('file-like\ncontents\n', t, 'b')
 
252
    def test_get_with_open_write_stream_sees_all_content(self):
 
253
        t = self.get_transport()
 
254
        if t.is_readonly():
 
255
            return
 
256
        handle = t.open_write_stream('foo')
 
257
        try:
 
258
            handle.write('b')
 
259
            self.assertEqual('b', t.get('foo').read())
 
260
        finally:
 
261
            handle.close()
 
262
 
 
263
    def test_get_bytes_with_open_write_stream_sees_all_content(self):
 
264
        t = self.get_transport()
 
265
        if t.is_readonly():
 
266
            return
 
267
        handle = t.open_write_stream('foo')
 
268
        try:
 
269
            handle.write('b')
 
270
            self.assertEqual('b', t.get_bytes('foo'))
 
271
            self.assertEqual('b', t.get('foo').read())
 
272
        finally:
 
273
            handle.close()
146
274
 
147
275
    def test_put_bytes(self):
148
276
        t = self.get_transport()
195
323
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
196
324
                               create_parent_dir=True)
197
325
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
198
 
        
 
326
 
199
327
        # But we still get NoSuchFile if we can't make the parent dir
200
328
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'not/there/a',
201
329
                                       'contents\n',
223
351
        umask = osutils.get_umask()
224
352
        t.put_bytes('nomode', 'test text\n', mode=None)
225
353
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
226
 
        
 
354
 
227
355
    def test_put_bytes_non_atomic_permissions(self):
228
356
        t = self.get_transport()
229
357
 
257
385
        t.put_bytes_non_atomic('dir777/mode664', 'test text\n', mode=0664,
258
386
                               dir_mode=0777, create_parent_dir=True)
259
387
        self.assertTransportMode(t, 'dir777', 0777)
260
 
        
 
388
 
261
389
    def test_put_file(self):
262
390
        t = self.get_transport()
263
391
 
266
394
                    t.put_file, 'a', StringIO('some text for a\n'))
267
395
            return
268
396
 
269
 
        t.put_file('a', StringIO('some text for a\n'))
 
397
        result = t.put_file('a', StringIO('some text for a\n'))
 
398
        # put_file returns the length of the data written
 
399
        self.assertEqual(16, result)
270
400
        self.failUnless(t.has('a'))
271
401
        self.check_transport_contents('some text for a\n', t, 'a')
272
402
        # Put also replaces contents
273
 
        t.put_file('a', StringIO('new\ncontents for\na\n'))
 
403
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
 
404
        self.assertEqual(19, result)
274
405
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
275
406
        self.assertRaises(NoSuchFile,
276
407
                          t.put_file, 'path/doesnt/exist/c',
308
439
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
309
440
                              create_parent_dir=True)
310
441
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
311
 
        
 
442
 
312
443
        # But we still get NoSuchFile if we can't make the parent dir
313
444
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'not/there/a',
314
445
                                       StringIO('contents\n'),
332
463
        # Yes, you can put a file such that it becomes readonly
333
464
        t.put_file('mode400', StringIO('test text\n'), mode=0400)
334
465
        self.assertTransportMode(t, 'mode400', 0400)
335
 
 
336
 
        # XXX: put_multi is deprecated, so do we really care anymore?
337
 
        self.applyDeprecated(zero_eleven, t.put_multi,
338
 
                             [('mmode644', StringIO('text\n'))], mode=0644)
339
 
        self.assertTransportMode(t, 'mmode644', 0644)
340
 
 
341
466
        # The default permissions should be based on the current umask
342
467
        umask = osutils.get_umask()
343
468
        t.put_file('nomode', StringIO('test text\n'), mode=None)
344
469
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
345
 
        
 
470
 
346
471
    def test_put_file_non_atomic_permissions(self):
347
472
        t = self.get_transport()
348
473
 
365
490
        umask = osutils.get_umask()
366
491
        t.put_file_non_atomic('nomode', StringIO('test text\n'), mode=None)
367
492
        self.assertTransportMode(t, 'nomode', 0666 & ~umask)
368
 
        
 
493
 
369
494
        # We should also be able to set the mode for a parent directory
370
495
        # when it is created
371
496
        sio = StringIO()
379
504
                              dir_mode=0777, create_parent_dir=True)
380
505
        self.assertTransportMode(t, 'dir777', 0777)
381
506
 
382
 
    def test_put_multi(self):
383
 
        t = self.get_transport()
384
 
 
385
 
        if t.is_readonly():
386
 
            return
387
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
388
 
            t.put_multi, [('a', StringIO('new\ncontents for\na\n')),
389
 
                          ('d', StringIO('contents\nfor d\n'))]
390
 
            ))
391
 
        self.assertEqual(list(t.has_multi(['a', 'b', 'c', 'd'])),
392
 
                [True, False, False, True])
393
 
        self.check_transport_contents('new\ncontents for\na\n', t, 'a')
394
 
        self.check_transport_contents('contents\nfor d\n', t, 'd')
395
 
 
396
 
        self.assertEqual(2, self.applyDeprecated(zero_eleven,
397
 
            t.put_multi, iter([('a', StringIO('diff\ncontents for\na\n')),
398
 
                              ('d', StringIO('another contents\nfor d\n'))])
399
 
            ))
400
 
        self.check_transport_contents('diff\ncontents for\na\n', t, 'a')
401
 
        self.check_transport_contents('another contents\nfor d\n', t, 'd')
 
507
    def test_put_bytes_unicode(self):
 
508
        # Expect put_bytes to raise AssertionError or UnicodeEncodeError if
 
509
        # given unicode "bytes".  UnicodeEncodeError doesn't really make sense
 
510
        # (we don't want to encode unicode here at all, callers should be
 
511
        # strictly passing bytes to put_bytes), but we allow it for backwards
 
512
        # compatibility.  At some point we should use a specific exception.
 
513
        # See https://bugs.launchpad.net/bzr/+bug/106898.
 
514
        t = self.get_transport()
 
515
        if t.is_readonly():
 
516
            return
 
517
        unicode_string = u'\u1234'
 
518
        self.assertRaises(
 
519
            (AssertionError, UnicodeEncodeError),
 
520
            t.put_bytes, 'foo', unicode_string)
 
521
 
 
522
    def test_put_file_unicode(self):
 
523
        # Like put_bytes, except with a StringIO.StringIO of a unicode string.
 
524
        # This situation can happen (and has) if code is careless about the type
 
525
        # of "string" they initialise/write to a StringIO with.  We cannot use
 
526
        # cStringIO, because it never returns unicode from read.
 
527
        # Like put_bytes, UnicodeEncodeError isn't quite the right exception to
 
528
        # raise, but we raise it for hysterical raisins.
 
529
        t = self.get_transport()
 
530
        if t.is_readonly():
 
531
            return
 
532
        unicode_file = pyStringIO(u'\u1234')
 
533
        self.assertRaises(UnicodeEncodeError, t.put_file, 'foo', unicode_file)
402
534
 
403
535
    def test_mkdir(self):
404
536
        t = self.get_transport()
405
537
 
406
538
        if t.is_readonly():
407
 
            # cannot mkdir on readonly transports. We're not testing for 
 
539
            # cannot mkdir on readonly transports. We're not testing for
408
540
            # cache coherency because cache behaviour is not currently
409
541
            # defined for the transport interface.
410
542
            self.assertRaises(TransportNotPossible, t.mkdir, '.')
431
563
 
432
564
        # we were testing that a local mkdir followed by a transport
433
565
        # mkdir failed thusly, but given that we * in one process * do not
434
 
        # concurrently fiddle with disk dirs and then use transport to do 
 
566
        # concurrently fiddle with disk dirs and then use transport to do
435
567
        # things, the win here seems marginal compared to the constraint on
436
568
        # the interface. RBC 20051227
437
569
        t.mkdir('dir_g')
470
602
        t.mkdir('dnomode', mode=None)
471
603
        self.assertTransportMode(t, 'dnomode', 0777 & ~umask)
472
604
 
 
605
    def test_opening_a_file_stream_creates_file(self):
 
606
        t = self.get_transport()
 
607
        if t.is_readonly():
 
608
            return
 
609
        handle = t.open_write_stream('foo')
 
610
        try:
 
611
            self.assertEqual('', t.get_bytes('foo'))
 
612
        finally:
 
613
            handle.close()
 
614
 
 
615
    def test_opening_a_file_stream_can_set_mode(self):
 
616
        t = self.get_transport()
 
617
        if t.is_readonly():
 
618
            return
 
619
        if not t._can_roundtrip_unix_modebits():
 
620
            # Can't roundtrip, so no need to run this test
 
621
            return
 
622
        def check_mode(name, mode, expected):
 
623
            handle = t.open_write_stream(name, mode=mode)
 
624
            handle.close()
 
625
            self.assertTransportMode(t, name, expected)
 
626
        check_mode('mode644', 0644, 0644)
 
627
        check_mode('mode666', 0666, 0666)
 
628
        check_mode('mode600', 0600, 0600)
 
629
        # The default permissions should be based on the current umask
 
630
        check_mode('nomode', None, 0666 & ~osutils.get_umask())
 
631
 
473
632
    def test_copy_to(self):
474
633
        # FIXME: test:   same server to same server (partly done)
475
634
        # same protocol two servers
476
635
        # and    different protocols (done for now except for MemoryTransport.
477
636
        # - RBC 20060122
478
 
        from bzrlib.transport.memory import MemoryTransport
479
637
 
480
638
        def simple_copy_files(transport_from, transport_to):
481
639
            files = ['a', 'b', 'c', 'd']
521
679
            for f in files:
522
680
                self.assertTransportMode(temp_transport, f, mode)
523
681
 
524
 
    def test_append(self):
 
682
    def test_create_prefix(self):
525
683
        t = self.get_transport()
526
 
 
527
 
        if t.is_readonly():
528
 
            return
529
 
        t.put_bytes('a', 'diff\ncontents for\na\n')
530
 
        t.put_bytes('b', 'contents\nfor b\n')
531
 
 
532
 
        self.assertEqual(20, self.applyDeprecated(zero_eleven,
533
 
            t.append, 'a', StringIO('add\nsome\nmore\ncontents\n')))
534
 
 
535
 
        self.check_transport_contents(
536
 
            'diff\ncontents for\na\nadd\nsome\nmore\ncontents\n',
537
 
            t, 'a')
538
 
 
539
 
        # And we can create new files, too
540
 
        self.assertEqual(0, self.applyDeprecated(zero_eleven,
541
 
            t.append, 'c', StringIO('some text\nfor a missing file\n')))
542
 
        self.check_transport_contents('some text\nfor a missing file\n',
543
 
                                      t, 'c')
 
684
        sub = t.clone('foo').clone('bar')
 
685
        try:
 
686
            sub.create_prefix()
 
687
        except TransportNotPossible:
 
688
            self.assertTrue(t.is_readonly())
 
689
        else:
 
690
            self.assertTrue(t.has('foo/bar'))
 
691
 
544
692
    def test_append_file(self):
545
693
        t = self.get_transport()
546
694
 
650
798
                t.append_file, 'f', StringIO('f'), mode=None)
651
799
            return
652
800
        t.append_file('f', StringIO('f'), mode=None)
653
 
        
 
801
 
654
802
    def test_append_bytes_mode(self):
655
803
        # check append_bytes accepts a mode
656
804
        t = self.get_transport()
659
807
                t.append_bytes, 'f', 'f', mode=None)
660
808
            return
661
809
        t.append_bytes('f', 'f', mode=None)
662
 
        
 
810
 
663
811
    def test_delete(self):
664
812
        # TODO: Test Transport.delete
665
813
        t = self.get_transport()
704
852
        # plain "listdir".
705
853
        # self.assertEqual([], os.listdir('.'))
706
854
 
 
855
    def test_recommended_page_size(self):
 
856
        """Transports recommend a page size for partial access to files."""
 
857
        t = self.get_transport()
 
858
        self.assertIsInstance(t.recommended_page_size(), int)
 
859
 
707
860
    def test_rmdir(self):
708
861
        t = self.get_transport()
709
862
        # Not much to do with a readonly transport
721
874
 
722
875
    def test_rmdir_not_empty(self):
723
876
        """Deleting a non-empty directory raises an exception
724
 
        
 
877
 
725
878
        sftp (and possibly others) don't give us a specific "directory not
726
879
        empty" exception -- we can just see that the operation failed.
727
880
        """
732
885
        t.mkdir('adir/bdir')
733
886
        self.assertRaises(PathError, t.rmdir, 'adir')
734
887
 
 
888
    def test_rmdir_empty_but_similar_prefix(self):
 
889
        """rmdir does not get confused by sibling paths.
 
890
 
 
891
        A naive implementation of MemoryTransport would refuse to rmdir
 
892
        ".bzr/branch" if there is a ".bzr/branch-format" directory, because it
 
893
        uses "path.startswith(dir)" on all file paths to determine if directory
 
894
        is empty.
 
895
        """
 
896
        t = self.get_transport()
 
897
        if t.is_readonly():
 
898
            return
 
899
        t.mkdir('foo')
 
900
        t.put_bytes('foo-bar', '')
 
901
        t.mkdir('foo-baz')
 
902
        t.rmdir('foo')
 
903
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
 
904
        self.failUnless(t.has('foo-bar'))
 
905
 
735
906
    def test_rename_dir_succeeds(self):
736
907
        t = self.get_transport()
737
908
        if t.is_readonly():
751
922
        t.mkdir('adir/asubdir')
752
923
        t.mkdir('bdir')
753
924
        t.mkdir('bdir/bsubdir')
 
925
        # any kind of PathError would be OK, though we normally expect
 
926
        # DirectoryNotEmpty
754
927
        self.assertRaises(PathError, t.rename, 'bdir', 'adir')
755
928
        # nothing was changed so it should still be as before
756
929
        self.assertTrue(t.has('bdir/bsubdir'))
757
930
        self.assertFalse(t.has('adir/bdir'))
758
931
        self.assertFalse(t.has('adir/bsubdir'))
759
932
 
 
933
    def test_rename_across_subdirs(self):
 
934
        t = self.get_transport()
 
935
        if t.is_readonly():
 
936
            raise TestNotApplicable("transport is readonly")
 
937
        t.mkdir('a')
 
938
        t.mkdir('b')
 
939
        ta = t.clone('a')
 
940
        tb = t.clone('b')
 
941
        ta.put_bytes('f', 'aoeu')
 
942
        ta.rename('f', '../b/f')
 
943
        self.assertTrue(tb.has('f'))
 
944
        self.assertFalse(ta.has('f'))
 
945
        self.assertTrue(t.has('b/f'))
 
946
 
760
947
    def test_delete_tree(self):
761
948
        t = self.get_transport()
762
949
 
772
959
        except TransportNotPossible:
773
960
            # ok, this transport does not support delete_tree
774
961
            return
775
 
        
 
962
 
776
963
        # did it delete that trivial case?
777
964
        self.assertRaises(NoSuchFile, t.stat, 'adir')
778
965
 
779
966
        self.build_tree(['adir/',
780
 
                         'adir/file', 
781
 
                         'adir/subdir/', 
782
 
                         'adir/subdir/file', 
 
967
                         'adir/file',
 
968
                         'adir/subdir/',
 
969
                         'adir/subdir/file',
783
970
                         'adir/subdir2/',
784
971
                         'adir/subdir2/file',
785
972
                         ], transport=t)
816
1003
        self.check_transport_contents('c this file\n', t, 'b')
817
1004
 
818
1005
        # TODO: Try to write a test for atomicity
819
 
        # TODO: Test moving into a non-existant subdirectory
 
1006
        # TODO: Test moving into a non-existent subdirectory
820
1007
        # TODO: Test Transport.move_multi
821
1008
 
822
1009
    def test_copy(self):
841
1028
        # TODO: test copy_multi
842
1029
 
843
1030
    def test_connection_error(self):
844
 
        """ConnectionError is raised when connection is impossible"""
 
1031
        """ConnectionError is raised when connection is impossible.
 
1032
 
 
1033
        The error should be raised from the first operation on the transport.
 
1034
        """
845
1035
        try:
846
1036
            url = self._server.get_bogus_url()
847
1037
        except NotImplementedError:
848
1038
            raise TestSkipped("Transport %s has no bogus URL support." %
849
1039
                              self._server.__class__)
850
 
        try:
851
 
            t = bzrlib.transport.get_transport(url)
852
 
            t.get('.bzr/branch')
853
 
        except (ConnectionError, NoSuchFile), e:
854
 
            pass
855
 
        except (Exception), e:
856
 
            self.fail('Wrong exception thrown (%s.%s): %s' 
857
 
                        % (e.__class__.__module__, e.__class__.__name__, e))
858
 
        else:
859
 
            self.fail('Did not get the expected ConnectionError or NoSuchFile.')
 
1040
        t = get_transport(url)
 
1041
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
860
1042
 
861
1043
    def test_stat(self):
862
1044
        # TODO: Test stat, just try once, and if it throws, stop testing
871
1053
            return
872
1054
 
873
1055
        paths = ['a', 'b/', 'b/c', 'b/d/', 'b/d/e']
874
 
        sizes = [14, 0, 16, 0, 18] 
 
1056
        sizes = [14, 0, 16, 0, 18]
875
1057
        self.build_tree(paths, transport=t, line_endings='binary')
876
1058
 
877
1059
        for path, size in zip(paths, sizes):
896
1078
        subdir.stat('./file')
897
1079
        subdir.stat('.')
898
1080
 
 
1081
    def test_hardlink(self):
 
1082
        from stat import ST_NLINK
 
1083
 
 
1084
        t = self.get_transport()
 
1085
 
 
1086
        source_name = "original_target"
 
1087
        link_name = "target_link"
 
1088
 
 
1089
        self.build_tree([source_name], transport=t)
 
1090
 
 
1091
        try:
 
1092
            t.hardlink(source_name, link_name)
 
1093
 
 
1094
            self.failUnless(t.has(source_name))
 
1095
            self.failUnless(t.has(link_name))
 
1096
 
 
1097
            st = t.stat(link_name)
 
1098
            self.failUnlessEqual(st[ST_NLINK], 2)
 
1099
        except TransportNotPossible:
 
1100
            raise TestSkipped("Transport %s does not support hardlinks." %
 
1101
                              self._server.__class__)
 
1102
 
 
1103
    def test_symlink(self):
 
1104
        from stat import S_ISLNK
 
1105
 
 
1106
        t = self.get_transport()
 
1107
 
 
1108
        source_name = "original_target"
 
1109
        link_name = "target_link"
 
1110
 
 
1111
        self.build_tree([source_name], transport=t)
 
1112
 
 
1113
        try:
 
1114
            t.symlink(source_name, link_name)
 
1115
 
 
1116
            self.failUnless(t.has(source_name))
 
1117
            self.failUnless(t.has(link_name))
 
1118
 
 
1119
            st = t.stat(link_name)
 
1120
            self.failUnless(S_ISLNK(st.st_mode),
 
1121
                "expected symlink, got mode %o" % st.st_mode)
 
1122
        except TransportNotPossible:
 
1123
            raise TestSkipped("Transport %s does not support symlinks." %
 
1124
                              self._server.__class__)
 
1125
        except IOError:
 
1126
            raise tests.KnownFailure("Paramiko fails to create symlinks during tests")
 
1127
 
899
1128
    def test_list_dir(self):
900
1129
        # TODO: Test list_dir, just try once, and if it throws, stop testing
901
1130
        t = self.get_transport()
902
 
        
 
1131
 
903
1132
        if not t.listable():
904
1133
            self.assertRaises(TransportNotPossible, t.list_dir, '.')
905
1134
            return
906
1135
 
907
 
        def sorted_list(d):
908
 
            l = list(t.list_dir(d))
 
1136
        def sorted_list(d, transport):
 
1137
            l = list(transport.list_dir(d))
909
1138
            l.sort()
910
1139
            return l
911
1140
 
912
 
        # SftpServer creates control files in the working directory
913
 
        # so lets move down a directory to avoid those.
914
 
        if not t.is_readonly():
915
 
            t.mkdir('wd')
916
 
        else:
917
 
            os.mkdir('wd')
918
 
        t = t.clone('wd')
919
 
 
920
 
        self.assertEqual([], sorted_list('.'))
 
1141
        self.assertEqual([], sorted_list('.', t))
921
1142
        # c2 is precisely one letter longer than c here to test that
922
1143
        # suffixing is not confused.
923
1144
        # a%25b checks that quoting is done consistently across transports
924
1145
        tree_names = ['a', 'a%25b', 'b', 'c/', 'c/d', 'c/e', 'c2/']
 
1146
 
925
1147
        if not t.is_readonly():
926
1148
            self.build_tree(tree_names, transport=t)
927
1149
        else:
928
 
            self.build_tree(['wd/' + name for name in tree_names])
929
 
 
930
 
        self.assertEqual(
931
 
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.'))
932
 
        self.assertEqual(['d', 'e'], sorted_list('c'))
 
1150
            self.build_tree(tree_names)
 
1151
 
 
1152
        self.assertEqual(
 
1153
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('', t))
 
1154
        self.assertEqual(
 
1155
            ['a', 'a%2525b', 'b', 'c', 'c2'], sorted_list('.', t))
 
1156
        self.assertEqual(['d', 'e'], sorted_list('c', t))
 
1157
 
 
1158
        # Cloning the transport produces an equivalent listing
 
1159
        self.assertEqual(['d', 'e'], sorted_list('', t.clone('c')))
933
1160
 
934
1161
        if not t.is_readonly():
935
1162
            t.delete('c/d')
936
1163
            t.delete('b')
937
1164
        else:
938
 
            os.unlink('wd/c/d')
939
 
            os.unlink('wd/b')
940
 
            
941
 
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.'))
942
 
        self.assertEqual(['e'], sorted_list('c'))
 
1165
            os.unlink('c/d')
 
1166
            os.unlink('b')
 
1167
 
 
1168
        self.assertEqual(['a', 'a%2525b', 'c', 'c2'], sorted_list('.', t))
 
1169
        self.assertEqual(['e'], sorted_list('c', t))
943
1170
 
944
1171
        self.assertListRaises(PathError, t.list_dir, 'q')
945
1172
        self.assertListRaises(PathError, t.list_dir, 'c/f')
 
1173
        # 'a' is a file, list_dir should raise an error
946
1174
        self.assertListRaises(PathError, t.list_dir, 'a')
947
1175
 
948
1176
    def test_list_dir_result_is_url_escaped(self):
954
1182
            self.build_tree(['a/', 'a/%'], transport=t)
955
1183
        else:
956
1184
            self.build_tree(['a/', 'a/%'])
957
 
        
 
1185
 
958
1186
        names = list(t.list_dir('a'))
959
1187
        self.assertEqual(['%25'], names)
960
1188
        self.assertIsInstance(names[0], str)
961
1189
 
 
1190
    def test_clone_preserve_info(self):
 
1191
        t1 = self.get_transport()
 
1192
        if not isinstance(t1, ConnectedTransport):
 
1193
            raise TestSkipped("not a connected transport")
 
1194
 
 
1195
        t2 = t1.clone('subdir')
 
1196
        self.assertEquals(t1._scheme, t2._scheme)
 
1197
        self.assertEquals(t1._user, t2._user)
 
1198
        self.assertEquals(t1._password, t2._password)
 
1199
        self.assertEquals(t1._host, t2._host)
 
1200
        self.assertEquals(t1._port, t2._port)
 
1201
 
 
1202
    def test__reuse_for(self):
 
1203
        t = self.get_transport()
 
1204
        if not isinstance(t, ConnectedTransport):
 
1205
            raise TestSkipped("not a connected transport")
 
1206
 
 
1207
        def new_url(scheme=None, user=None, password=None,
 
1208
                    host=None, port=None, path=None):
 
1209
            """Build a new url from t.base changing only parts of it.
 
1210
 
 
1211
            Only the parameters different from None will be changed.
 
1212
            """
 
1213
            if scheme   is None: scheme   = t._scheme
 
1214
            if user     is None: user     = t._user
 
1215
            if password is None: password = t._password
 
1216
            if user     is None: user     = t._user
 
1217
            if host     is None: host     = t._host
 
1218
            if port     is None: port     = t._port
 
1219
            if path     is None: path     = t._path
 
1220
            return t._unsplit_url(scheme, user, password, host, port, path)
 
1221
 
 
1222
        if t._scheme == 'ftp':
 
1223
            scheme = 'sftp'
 
1224
        else:
 
1225
            scheme = 'ftp'
 
1226
        self.assertIsNot(t, t._reuse_for(new_url(scheme=scheme)))
 
1227
        if t._user == 'me':
 
1228
            user = 'you'
 
1229
        else:
 
1230
            user = 'me'
 
1231
        self.assertIsNot(t, t._reuse_for(new_url(user=user)))
 
1232
        # passwords are not taken into account because:
 
1233
        # - it makes no sense to have two different valid passwords for the
 
1234
        #   same user
 
1235
        # - _password in ConnectedTransport is intended to collect what the
 
1236
        #   user specified from the command-line and there are cases where the
 
1237
        #   new url can contain no password (if the url was built from an
 
1238
        #   existing transport.base for example)
 
1239
        # - password are considered part of the credentials provided at
 
1240
        #   connection creation time and as such may not be present in the url
 
1241
        #   (they may be typed by the user when prompted for example)
 
1242
        self.assertIs(t, t._reuse_for(new_url(password='from space')))
 
1243
        # We will not connect, we can use a invalid host
 
1244
        self.assertIsNot(t, t._reuse_for(new_url(host=t._host + 'bar')))
 
1245
        if t._port == 1234:
 
1246
            port = 4321
 
1247
        else:
 
1248
            port = 1234
 
1249
        self.assertIsNot(t, t._reuse_for(new_url(port=port)))
 
1250
        # No point in trying to reuse a transport for a local URL
 
1251
        self.assertIs(None, t._reuse_for('/valid_but_not_existing'))
 
1252
 
 
1253
    def test_connection_sharing(self):
 
1254
        t = self.get_transport()
 
1255
        if not isinstance(t, ConnectedTransport):
 
1256
            raise TestSkipped("not a connected transport")
 
1257
 
 
1258
        c = t.clone('subdir')
 
1259
        # Some transports will create the connection  only when needed
 
1260
        t.has('surely_not') # Force connection
 
1261
        self.assertIs(t._get_connection(), c._get_connection())
 
1262
 
 
1263
        # Temporary failure, we need to create a new dummy connection
 
1264
        new_connection = None
 
1265
        t._set_connection(new_connection)
 
1266
        # Check that both transports use the same connection
 
1267
        self.assertIs(new_connection, t._get_connection())
 
1268
        self.assertIs(new_connection, c._get_connection())
 
1269
 
 
1270
    def test_reuse_connection_for_various_paths(self):
 
1271
        t = self.get_transport()
 
1272
        if not isinstance(t, ConnectedTransport):
 
1273
            raise TestSkipped("not a connected transport")
 
1274
 
 
1275
        t.has('surely_not') # Force connection
 
1276
        self.assertIsNot(None, t._get_connection())
 
1277
 
 
1278
        subdir = t._reuse_for(t.base + 'whatever/but/deep/down/the/path')
 
1279
        self.assertIsNot(t, subdir)
 
1280
        self.assertIs(t._get_connection(), subdir._get_connection())
 
1281
 
 
1282
        home = subdir._reuse_for(t.base + 'home')
 
1283
        self.assertIs(t._get_connection(), home._get_connection())
 
1284
        self.assertIs(subdir._get_connection(), home._get_connection())
 
1285
 
962
1286
    def test_clone(self):
963
1287
        # TODO: Test that clone moves up and down the filesystem
964
1288
        t1 = self.get_transport()
984
1308
        self.failIf(t3.has('b/d'))
985
1309
 
986
1310
        if t1.is_readonly():
987
 
            open('b/d', 'wb').write('newfile\n')
 
1311
            self.build_tree_contents([('b/d', 'newfile\n')])
988
1312
        else:
989
1313
            t2.put_bytes('d', 'newfile\n')
990
1314
 
992
1316
        self.failUnless(t2.has('d'))
993
1317
        self.failUnless(t3.has('b/d'))
994
1318
 
 
1319
    def test_clone_to_root(self):
 
1320
        orig_transport = self.get_transport()
 
1321
        # Repeatedly go up to a parent directory until we're at the root
 
1322
        # directory of this transport
 
1323
        root_transport = orig_transport
 
1324
        new_transport = root_transport.clone("..")
 
1325
        # as we are walking up directories, the path must be
 
1326
        # growing less, except at the top
 
1327
        self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1328
            or new_transport.base == root_transport.base)
 
1329
        while new_transport.base != root_transport.base:
 
1330
            root_transport = new_transport
 
1331
            new_transport = root_transport.clone("..")
 
1332
            # as we are walking up directories, the path must be
 
1333
            # growing less, except at the top
 
1334
            self.assertTrue(len(new_transport.base) < len(root_transport.base)
 
1335
                or new_transport.base == root_transport.base)
 
1336
 
 
1337
        # Cloning to "/" should take us to exactly the same location.
 
1338
        self.assertEqual(root_transport.base, orig_transport.clone("/").base)
 
1339
        # the abspath of "/" from the original transport should be the same
 
1340
        # as the base at the root:
 
1341
        self.assertEqual(orig_transport.abspath("/"), root_transport.base)
 
1342
 
 
1343
        # At the root, the URL must still end with / as its a directory
 
1344
        self.assertEqual(root_transport.base[-1], '/')
 
1345
 
 
1346
    def test_clone_from_root(self):
 
1347
        """At the root, cloning to a simple dir should just do string append."""
 
1348
        orig_transport = self.get_transport()
 
1349
        root_transport = orig_transport.clone('/')
 
1350
        self.assertEqual(root_transport.base + '.bzr/',
 
1351
            root_transport.clone('.bzr').base)
 
1352
 
 
1353
    def test_base_url(self):
 
1354
        t = self.get_transport()
 
1355
        self.assertEqual('/', t.base[-1])
 
1356
 
995
1357
    def test_relpath(self):
996
1358
        t = self.get_transport()
997
1359
        self.assertEqual('', t.relpath(t.base))
998
1360
        # base ends with /
999
1361
        self.assertEqual('', t.relpath(t.base[:-1]))
1000
 
        # subdirs which dont exist should still give relpaths.
 
1362
        # subdirs which don't exist should still give relpaths.
1001
1363
        self.assertEqual('foo', t.relpath(t.base + 'foo'))
1002
1364
        # trailing slash should be the same.
1003
1365
        self.assertEqual('foo', t.relpath(t.base + 'foo/'))
1019
1381
        # that have aliasing problems like symlinks should go in backend
1020
1382
        # specific test cases.
1021
1383
        transport = self.get_transport()
1022
 
        
1023
 
        # disabled because some transports might normalize urls in generating
1024
 
        # the abspath - eg http+pycurl-> just http -- mbp 20060308 
 
1384
 
1025
1385
        self.assertEqual(transport.base + 'relpath',
1026
1386
                         transport.abspath('relpath'))
1027
1387
 
 
1388
        # This should work without raising an error.
 
1389
        transport.abspath("/")
 
1390
 
 
1391
        # the abspath of "/" and "/foo/.." should result in the same location
 
1392
        self.assertEqual(transport.abspath("/"), transport.abspath("/foo/.."))
 
1393
 
 
1394
        self.assertEqual(transport.clone("/").abspath('foo'),
 
1395
                         transport.abspath("/foo"))
 
1396
 
 
1397
    def test_win32_abspath(self):
 
1398
        # Note: we tried to set sys.platform='win32' so we could test on
 
1399
        # other platforms too, but then osutils does platform specific
 
1400
        # things at import time which defeated us...
 
1401
        if sys.platform != 'win32':
 
1402
            raise TestSkipped(
 
1403
                'Testing drive letters in abspath implemented only for win32')
 
1404
 
 
1405
        # smoke test for abspath on win32.
 
1406
        # a transport based on 'file:///' never fully qualifies the drive.
 
1407
        transport = get_transport("file:///")
 
1408
        self.failUnlessEqual(transport.abspath("/"), "file:///")
 
1409
 
 
1410
        # but a transport that starts with a drive spec must keep it.
 
1411
        transport = get_transport("file:///C:/")
 
1412
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
 
1413
 
1028
1414
    def test_local_abspath(self):
1029
1415
        transport = self.get_transport()
1030
1416
        try:
1031
1417
            p = transport.local_abspath('.')
1032
 
        except TransportNotPossible:
1033
 
            pass # This is not a local transport
 
1418
        except (errors.NotLocalUrl, TransportNotPossible), e:
 
1419
            # should be formattable
 
1420
            s = str(e)
1034
1421
        else:
1035
1422
            self.assertEqual(getcwd(), p)
1036
1423
 
1106
1493
                         'to/dir/b%2525z',
1107
1494
                         'to/bar',]))
1108
1495
 
 
1496
    def test_copy_tree_to_transport(self):
 
1497
        transport = self.get_transport()
 
1498
        if not transport.listable():
 
1499
            self.assertRaises(TransportNotPossible,
 
1500
                              transport.iter_files_recursive)
 
1501
            return
 
1502
        if transport.is_readonly():
 
1503
            return
 
1504
        self.build_tree(['from/',
 
1505
                         'from/dir/',
 
1506
                         'from/dir/foo',
 
1507
                         'from/dir/bar',
 
1508
                         'from/dir/b%25z', # make sure quoting is correct
 
1509
                         'from/bar'],
 
1510
                        transport=transport)
 
1511
        from_transport = transport.clone('from')
 
1512
        to_transport = transport.clone('to')
 
1513
        to_transport.ensure_base()
 
1514
        from_transport.copy_tree_to_transport(to_transport)
 
1515
        paths = set(transport.iter_files_recursive())
 
1516
        self.assertEqual(paths,
 
1517
                    set(['from/dir/foo',
 
1518
                         'from/dir/bar',
 
1519
                         'from/dir/b%2525z',
 
1520
                         'from/bar',
 
1521
                         'to/dir/foo',
 
1522
                         'to/dir/bar',
 
1523
                         'to/dir/b%2525z',
 
1524
                         'to/bar',]))
 
1525
 
1109
1526
    def test_unicode_paths(self):
1110
1527
        """Test that we can read/write files with Unicode names."""
1111
1528
        t = self.get_transport()
1122
1539
                 u'\u65e5', # Kanji person
1123
1540
                ]
1124
1541
 
 
1542
        no_unicode_support = getattr(self._server, 'no_unicode_support', False)
 
1543
        if no_unicode_support:
 
1544
            raise tests.KnownFailure("test server cannot handle unicode paths")
 
1545
 
1125
1546
        try:
1126
1547
            self.build_tree(files, transport=t, line_endings='binary')
1127
1548
        except UnicodeError:
1137
1558
            self.check_transport_contents(contents, t, urlutils.escape(fname))
1138
1559
 
1139
1560
    def test_connect_twice_is_same_content(self):
1140
 
        # check that our server (whatever it is) is accessable reliably
 
1561
        # check that our server (whatever it is) is accessible reliably
1141
1562
        # via get_transport and multiple connections share content.
1142
1563
        transport = self.get_transport()
1143
1564
        if transport.is_readonly():
1144
1565
            return
1145
1566
        transport.put_bytes('foo', 'bar')
1146
 
        transport2 = self.get_transport()
1147
 
        self.check_transport_contents('bar', transport2, 'foo')
1148
 
        # its base should be usable.
1149
 
        transport2 = bzrlib.transport.get_transport(transport.base)
1150
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1567
        transport3 = self.get_transport()
 
1568
        self.check_transport_contents('bar', transport3, 'foo')
1151
1569
 
1152
1570
        # now opening at a relative url should give use a sane result:
1153
1571
        transport.mkdir('newdir')
1154
 
        transport2 = bzrlib.transport.get_transport(transport.base + "newdir")
1155
 
        transport2 = transport2.clone('..')
1156
 
        self.check_transport_contents('bar', transport2, 'foo')
 
1572
        transport5 = self.get_transport('newdir')
 
1573
        transport6 = transport5.clone('..')
 
1574
        self.check_transport_contents('bar', transport6, 'foo')
1157
1575
 
1158
1576
    def test_lock_write(self):
 
1577
        """Test transport-level write locks.
 
1578
 
 
1579
        These are deprecated and transports may decline to support them.
 
1580
        """
1159
1581
        transport = self.get_transport()
1160
1582
        if transport.is_readonly():
1161
1583
            self.assertRaises(TransportNotPossible, transport.lock_write, 'foo')
1162
1584
            return
1163
1585
        transport.put_bytes('lock', '')
1164
 
        lock = transport.lock_write('lock')
 
1586
        try:
 
1587
            lock = transport.lock_write('lock')
 
1588
        except TransportNotPossible:
 
1589
            return
1165
1590
        # TODO make this consistent on all platforms:
1166
1591
        # self.assertRaises(LockError, transport.lock_write, 'lock')
1167
1592
        lock.unlock()
1168
1593
 
1169
1594
    def test_lock_read(self):
 
1595
        """Test transport-level read locks.
 
1596
 
 
1597
        These are deprecated and transports may decline to support them.
 
1598
        """
1170
1599
        transport = self.get_transport()
1171
1600
        if transport.is_readonly():
1172
1601
            file('lock', 'w').close()
1173
1602
        else:
1174
1603
            transport.put_bytes('lock', '')
1175
 
        lock = transport.lock_read('lock')
 
1604
        try:
 
1605
            lock = transport.lock_read('lock')
 
1606
        except TransportNotPossible:
 
1607
            return
1176
1608
        # TODO make this consistent on all platforms:
1177
1609
        # self.assertRaises(LockError, transport.lock_read, 'lock')
1178
1610
        lock.unlock()
1184
1616
        else:
1185
1617
            transport.put_bytes('a', '0123456789')
1186
1618
 
 
1619
        d = list(transport.readv('a', ((0, 1),)))
 
1620
        self.assertEqual(d[0], (0, '0'))
 
1621
 
1187
1622
        d = list(transport.readv('a', ((0, 1), (1, 1), (3, 2), (9, 1))))
1188
1623
        self.assertEqual(d[0], (0, '0'))
1189
1624
        self.assertEqual(d[1], (1, '1'))
1202
1637
        self.assertEqual(d[1], (9, '9'))
1203
1638
        self.assertEqual(d[2], (0, '0'))
1204
1639
        self.assertEqual(d[3], (3, '34'))
 
1640
 
 
1641
    def test_readv_with_adjust_for_latency(self):
 
1642
        transport = self.get_transport()
 
1643
        # the adjust for latency flag expands the data region returned
 
1644
        # according to a per-transport heuristic, so testing is a little
 
1645
        # tricky as we need more data than the largest combining that our
 
1646
        # transports do. To accomodate this we generate random data and cross
 
1647
        # reference the returned data with the random data. To avoid doing
 
1648
        # multiple large random byte look ups we do several tests on the same
 
1649
        # backing data.
 
1650
        content = osutils.rand_bytes(200*1024)
 
1651
        content_size = len(content)
 
1652
        if transport.is_readonly():
 
1653
            self.build_tree_contents([('a', content)])
 
1654
        else:
 
1655
            transport.put_bytes('a', content)
 
1656
        def check_result_data(result_vector):
 
1657
            for item in result_vector:
 
1658
                data_len = len(item[1])
 
1659
                self.assertEqual(content[item[0]:item[0] + data_len], item[1])
 
1660
 
 
1661
        # start corner case
 
1662
        result = list(transport.readv('a', ((0, 30),),
 
1663
            adjust_for_latency=True, upper_limit=content_size))
 
1664
        # we expect 1 result, from 0, to something > 30
 
1665
        self.assertEqual(1, len(result))
 
1666
        self.assertEqual(0, result[0][0])
 
1667
        self.assertTrue(len(result[0][1]) >= 30)
 
1668
        check_result_data(result)
 
1669
        # end of file corner case
 
1670
        result = list(transport.readv('a', ((204700, 100),),
 
1671
            adjust_for_latency=True, upper_limit=content_size))
 
1672
        # we expect 1 result, from 204800- its length, to the end
 
1673
        self.assertEqual(1, len(result))
 
1674
        data_len = len(result[0][1])
 
1675
        self.assertEqual(204800-data_len, result[0][0])
 
1676
        self.assertTrue(data_len >= 100)
 
1677
        check_result_data(result)
 
1678
        # out of order ranges are made in order
 
1679
        result = list(transport.readv('a', ((204700, 100), (0, 50)),
 
1680
            adjust_for_latency=True, upper_limit=content_size))
 
1681
        # we expect 2 results, in order, start and end.
 
1682
        self.assertEqual(2, len(result))
 
1683
        # start
 
1684
        data_len = len(result[0][1])
 
1685
        self.assertEqual(0, result[0][0])
 
1686
        self.assertTrue(data_len >= 30)
 
1687
        # end
 
1688
        data_len = len(result[1][1])
 
1689
        self.assertEqual(204800-data_len, result[1][0])
 
1690
        self.assertTrue(data_len >= 100)
 
1691
        check_result_data(result)
 
1692
        # close ranges get combined (even if out of order)
 
1693
        for request_vector in [((400,50), (800, 234)), ((800, 234), (400,50))]:
 
1694
            result = list(transport.readv('a', request_vector,
 
1695
                adjust_for_latency=True, upper_limit=content_size))
 
1696
            self.assertEqual(1, len(result))
 
1697
            data_len = len(result[0][1])
 
1698
            # minimum length is from 400 to 1034 - 634
 
1699
            self.assertTrue(data_len >= 634)
 
1700
            # must contain the region 400 to 1034
 
1701
            self.assertTrue(result[0][0] <= 400)
 
1702
            self.assertTrue(result[0][0] + data_len >= 1034)
 
1703
            check_result_data(result)
 
1704
 
 
1705
    def test_readv_with_adjust_for_latency_with_big_file(self):
 
1706
        transport = self.get_transport()
 
1707
        # test from observed failure case.
 
1708
        if transport.is_readonly():
 
1709
            file('a', 'w').write('a'*1024*1024)
 
1710
        else:
 
1711
            transport.put_bytes('a', 'a'*1024*1024)
 
1712
        broken_vector = [(465219, 800), (225221, 800), (445548, 800),
 
1713
            (225037, 800), (221357, 800), (437077, 800), (947670, 800),
 
1714
            (465373, 800), (947422, 800)]
 
1715
        results = list(transport.readv('a', broken_vector, True, 1024*1024))
 
1716
        found_items = [False]*9
 
1717
        for pos, (start, length) in enumerate(broken_vector):
 
1718
            # check the range is covered by the result
 
1719
            for offset, data in results:
 
1720
                if offset <= start and start + length <= offset + len(data):
 
1721
                    found_items[pos] = True
 
1722
        self.assertEqual([True]*9, found_items)
 
1723
 
 
1724
    def test_get_with_open_write_stream_sees_all_content(self):
 
1725
        t = self.get_transport()
 
1726
        if t.is_readonly():
 
1727
            return
 
1728
        handle = t.open_write_stream('foo')
 
1729
        try:
 
1730
            handle.write('bcd')
 
1731
            self.assertEqual([(0, 'b'), (2, 'd')], list(t.readv('foo', ((0,1), (2,1)))))
 
1732
        finally:
 
1733
            handle.close()
 
1734
 
 
1735
    def test_get_smart_medium(self):
 
1736
        """All transports must either give a smart medium, or know they can't.
 
1737
        """
 
1738
        transport = self.get_transport()
 
1739
        try:
 
1740
            client_medium = transport.get_smart_medium()
 
1741
            self.assertIsInstance(client_medium, medium.SmartClientMedium)
 
1742
        except errors.NoSmartMedium:
 
1743
            # as long as we got it we're fine
 
1744
            pass
 
1745
 
 
1746
    def test_readv_short_read(self):
 
1747
        transport = self.get_transport()
 
1748
        if transport.is_readonly():
 
1749
            file('a', 'w').write('0123456789')
 
1750
        else:
 
1751
            transport.put_bytes('a', '01234567890')
 
1752
 
 
1753
        # This is intentionally reading off the end of the file
 
1754
        # since we are sure that it cannot get there
 
1755
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange,
 
1756
                               # Can be raised by paramiko
 
1757
                               AssertionError),
 
1758
                              transport.readv, 'a', [(1,1), (8,10)])
 
1759
 
 
1760
        # This is trying to seek past the end of the file, it should
 
1761
        # also raise a special error
 
1762
        self.assertListRaises((errors.ShortReadvError, errors.InvalidRange),
 
1763
                              transport.readv, 'a', [(12,2)])
 
1764
 
 
1765
    def test_stat_symlink(self):
 
1766
        # if a transport points directly to a symlink (and supports symlinks
 
1767
        # at all) you can tell this.  helps with bug 32669.
 
1768
        t = self.get_transport()
 
1769
        try:
 
1770
            t.symlink('target', 'link')
 
1771
        except TransportNotPossible:
 
1772
            raise TestSkipped("symlinks not supported")
 
1773
        t2 = t.clone('link')
 
1774
        st = t2.stat('')
 
1775
        self.assertTrue(stat.S_ISLNK(st.st_mode))