~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_transport.py

(gz) Change minimum required testtools version for selftest to 0.9.5 for
 unicode fixes (Martin [gz])

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
32
32
    osutils,
33
33
    pyutils,
34
34
    tests,
35
 
    transport as _mod_transport,
36
35
    urlutils,
37
36
    )
38
37
from bzrlib.errors import (ConnectionError,
53
52
from bzrlib.tests.test_transport import TestTransportImplementation
54
53
from bzrlib.transport import (
55
54
    ConnectedTransport,
 
55
    get_transport,
56
56
    _get_transport_modules,
57
57
    )
58
58
from bzrlib.transport.memory import MemoryTransport
98
98
 
99
99
    def setUp(self):
100
100
        super(TransportTests, self).setUp()
101
 
        self.overrideEnv('BZR_NO_SMART_VFS', None)
 
101
        self._captureVar('BZR_NO_SMART_VFS', None)
102
102
 
103
103
    def check_transport_contents(self, content, transport, relpath):
104
 
        """Check that transport.get_bytes(relpath) == content."""
105
 
        self.assertEqualDiff(content, transport.get_bytes(relpath))
 
104
        """Check that transport.get(relpath).read() == content."""
 
105
        self.assertEqualDiff(content, transport.get(relpath).read())
106
106
 
107
107
    def test_ensure_base_missing(self):
108
108
        """.ensure_base() should create the directory if it doesn't exist"""
256
256
        handle = t.open_write_stream('foo')
257
257
        try:
258
258
            handle.write('b')
259
 
            self.assertEqual('b', t.get_bytes('foo'))
 
259
            self.assertEqual('b', t.get('foo').read())
260
260
        finally:
261
261
            handle.close()
262
262
 
268
268
        try:
269
269
            handle.write('b')
270
270
            self.assertEqual('b', t.get_bytes('foo'))
271
 
            f = t.get('foo')
272
 
            try:
273
 
                self.assertEqual('b', f.read())
274
 
            finally:
275
 
                f.close()
 
271
            self.assertEqual('b', t.get('foo').read())
276
272
        finally:
277
273
            handle.close()
278
274
 
285
281
            return
286
282
 
287
283
        t.put_bytes('a', 'some text for a\n')
288
 
        self.assertTrue(t.has('a'))
 
284
        self.failUnless(t.has('a'))
289
285
        self.check_transport_contents('some text for a\n', t, 'a')
290
286
 
291
287
        # The contents should be overwritten
303
299
                    t.put_bytes_non_atomic, 'a', 'some text for a\n')
304
300
            return
305
301
 
306
 
        self.assertFalse(t.has('a'))
 
302
        self.failIf(t.has('a'))
307
303
        t.put_bytes_non_atomic('a', 'some text for a\n')
308
 
        self.assertTrue(t.has('a'))
 
304
        self.failUnless(t.has('a'))
309
305
        self.check_transport_contents('some text for a\n', t, 'a')
310
306
        # Put also replaces contents
311
307
        t.put_bytes_non_atomic('a', 'new\ncontents for\na\n')
323
319
        # Now test the create_parent flag
324
320
        self.assertRaises(NoSuchFile, t.put_bytes_non_atomic, 'dir/a',
325
321
                                       'contents\n')
326
 
        self.assertFalse(t.has('dir/a'))
 
322
        self.failIf(t.has('dir/a'))
327
323
        t.put_bytes_non_atomic('dir/a', 'contents for dir/a\n',
328
324
                               create_parent_dir=True)
329
325
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
401
397
        result = t.put_file('a', StringIO('some text for a\n'))
402
398
        # put_file returns the length of the data written
403
399
        self.assertEqual(16, result)
404
 
        self.assertTrue(t.has('a'))
 
400
        self.failUnless(t.has('a'))
405
401
        self.check_transport_contents('some text for a\n', t, 'a')
406
402
        # Put also replaces contents
407
403
        result = t.put_file('a', StringIO('new\ncontents for\na\n'))
419
415
                    t.put_file_non_atomic, 'a', StringIO('some text for a\n'))
420
416
            return
421
417
 
422
 
        self.assertFalse(t.has('a'))
 
418
        self.failIf(t.has('a'))
423
419
        t.put_file_non_atomic('a', StringIO('some text for a\n'))
424
 
        self.assertTrue(t.has('a'))
 
420
        self.failUnless(t.has('a'))
425
421
        self.check_transport_contents('some text for a\n', t, 'a')
426
422
        # Put also replaces contents
427
423
        t.put_file_non_atomic('a', StringIO('new\ncontents for\na\n'))
439
435
        # Now test the create_parent flag
440
436
        self.assertRaises(NoSuchFile, t.put_file_non_atomic, 'dir/a',
441
437
                                       StringIO('contents\n'))
442
 
        self.assertFalse(t.has('dir/a'))
 
438
        self.failIf(t.has('dir/a'))
443
439
        t.put_file_non_atomic('dir/a', StringIO('contents for dir/a\n'),
444
440
                              create_parent_dir=True)
445
441
        self.check_transport_contents('contents for dir/a\n', t, 'dir/a')
644
640
            self.build_tree(files, transport=transport_from)
645
641
            self.assertEqual(4, transport_from.copy_to(files, transport_to))
646
642
            for f in files:
647
 
                self.check_transport_contents(transport_to.get_bytes(f),
 
643
                self.check_transport_contents(transport_to.get(f).read(),
648
644
                                              transport_from, f)
649
645
 
650
646
        t = self.get_transport()
673
669
        files = ['a', 'b', 'c', 'd']
674
670
        t.copy_to(iter(files), temp_transport)
675
671
        for f in files:
676
 
            self.check_transport_contents(temp_transport.get_bytes(f),
 
672
            self.check_transport_contents(temp_transport.get(f).read(),
677
673
                                          t, f)
678
674
        del temp_transport
679
675
 
822
818
            return
823
819
 
824
820
        t.put_bytes('a', 'a little bit of text\n')
825
 
        self.assertTrue(t.has('a'))
 
821
        self.failUnless(t.has('a'))
826
822
        t.delete('a')
827
 
        self.assertFalse(t.has('a'))
 
823
        self.failIf(t.has('a'))
828
824
 
829
825
        self.assertRaises(NoSuchFile, t.delete, 'a')
830
826
 
836
832
        t.delete_multi(['a', 'c'])
837
833
        self.assertEqual([False, True, False],
838
834
                list(t.has_multi(['a', 'b', 'c'])))
839
 
        self.assertFalse(t.has('a'))
840
 
        self.assertTrue(t.has('b'))
841
 
        self.assertFalse(t.has('c'))
 
835
        self.failIf(t.has('a'))
 
836
        self.failUnless(t.has('b'))
 
837
        self.failIf(t.has('c'))
842
838
 
843
839
        self.assertRaises(NoSuchFile,
844
840
                t.delete_multi, ['a', 'b', 'c'])
905
901
        t.mkdir('foo-baz')
906
902
        t.rmdir('foo')
907
903
        self.assertRaises((NoSuchFile, PathError), t.rmdir, 'foo')
908
 
        self.assertTrue(t.has('foo-bar'))
 
904
        self.failUnless(t.has('foo-bar'))
909
905
 
910
906
    def test_rename_dir_succeeds(self):
911
907
        t = self.get_transport()
994
990
        self.assertEquals([True, False], list(t.has_multi(['a', 'b'])))
995
991
 
996
992
        t.move('a', 'b')
997
 
        self.assertTrue(t.has('b'))
998
 
        self.assertFalse(t.has('a'))
 
993
        self.failUnless(t.has('b'))
 
994
        self.failIf(t.has('a'))
999
995
 
1000
996
        self.check_transport_contents('a first file\n', t, 'b')
1001
997
        self.assertEquals([False, True], list(t.has_multi(['a', 'b'])))
1003
999
        # Overwrite a file
1004
1000
        t.put_bytes('c', 'c this file\n')
1005
1001
        t.move('c', 'b')
1006
 
        self.assertFalse(t.has('c'))
 
1002
        self.failIf(t.has('c'))
1007
1003
        self.check_transport_contents('c this file\n', t, 'b')
1008
1004
 
1009
1005
        # TODO: Try to write a test for atomicity
1041
1037
        except NotImplementedError:
1042
1038
            raise TestSkipped("Transport %s has no bogus URL support." %
1043
1039
                              self._server.__class__)
1044
 
        t = _mod_transport.get_transport(url)
 
1040
        t = get_transport(url)
1045
1041
        self.assertRaises((ConnectionError, NoSuchFile), t.get, '.bzr/branch')
1046
1042
 
1047
1043
    def test_stat(self):
1063
1059
        for path, size in zip(paths, sizes):
1064
1060
            st = t.stat(path)
1065
1061
            if path.endswith('/'):
1066
 
                self.assertTrue(S_ISDIR(st.st_mode))
 
1062
                self.failUnless(S_ISDIR(st.st_mode))
1067
1063
                # directory sizes are meaningless
1068
1064
            else:
1069
 
                self.assertTrue(S_ISREG(st.st_mode))
 
1065
                self.failUnless(S_ISREG(st.st_mode))
1070
1066
                self.assertEqual(size, st.st_size)
1071
1067
 
1072
1068
        remote_stats = list(t.stat_multi(paths))
1095
1091
        try:
1096
1092
            t.hardlink(source_name, link_name)
1097
1093
 
1098
 
            self.assertTrue(t.has(source_name))
1099
 
            self.assertTrue(t.has(link_name))
 
1094
            self.failUnless(t.has(source_name))
 
1095
            self.failUnless(t.has(link_name))
1100
1096
 
1101
1097
            st = t.stat(link_name)
1102
 
            self.assertEqual(st[ST_NLINK], 2)
 
1098
            self.failUnlessEqual(st[ST_NLINK], 2)
1103
1099
        except TransportNotPossible:
1104
1100
            raise TestSkipped("Transport %s does not support hardlinks." %
1105
1101
                              self._server.__class__)
1117
1113
        try:
1118
1114
            t.symlink(source_name, link_name)
1119
1115
 
1120
 
            self.assertTrue(t.has(source_name))
1121
 
            self.assertTrue(t.has(link_name))
 
1116
            self.failUnless(t.has(source_name))
 
1117
            self.failUnless(t.has(link_name))
1122
1118
 
1123
1119
            st = t.stat(link_name)
1124
 
            self.assertTrue(S_ISLNK(st.st_mode),
 
1120
            self.failUnless(S_ISLNK(st.st_mode),
1125
1121
                "expected symlink, got mode %o" % st.st_mode)
1126
1122
        except TransportNotPossible:
1127
1123
            raise TestSkipped("Transport %s does not support symlinks." %
1293
1289
 
1294
1290
        self.build_tree(['a', 'b/', 'b/c'], transport=t1)
1295
1291
 
1296
 
        self.assertTrue(t1.has('a'))
1297
 
        self.assertTrue(t1.has('b/c'))
1298
 
        self.assertFalse(t1.has('c'))
 
1292
        self.failUnless(t1.has('a'))
 
1293
        self.failUnless(t1.has('b/c'))
 
1294
        self.failIf(t1.has('c'))
1299
1295
 
1300
1296
        t2 = t1.clone('b')
1301
1297
        self.assertEqual(t1.base + 'b/', t2.base)
1302
1298
 
1303
 
        self.assertTrue(t2.has('c'))
1304
 
        self.assertFalse(t2.has('a'))
 
1299
        self.failUnless(t2.has('c'))
 
1300
        self.failIf(t2.has('a'))
1305
1301
 
1306
1302
        t3 = t2.clone('..')
1307
 
        self.assertTrue(t3.has('a'))
1308
 
        self.assertFalse(t3.has('c'))
 
1303
        self.failUnless(t3.has('a'))
 
1304
        self.failIf(t3.has('c'))
1309
1305
 
1310
 
        self.assertFalse(t1.has('b/d'))
1311
 
        self.assertFalse(t2.has('d'))
1312
 
        self.assertFalse(t3.has('b/d'))
 
1306
        self.failIf(t1.has('b/d'))
 
1307
        self.failIf(t2.has('d'))
 
1308
        self.failIf(t3.has('b/d'))
1313
1309
 
1314
1310
        if t1.is_readonly():
1315
1311
            self.build_tree_contents([('b/d', 'newfile\n')])
1316
1312
        else:
1317
1313
            t2.put_bytes('d', 'newfile\n')
1318
1314
 
1319
 
        self.assertTrue(t1.has('b/d'))
1320
 
        self.assertTrue(t2.has('d'))
1321
 
        self.assertTrue(t3.has('b/d'))
 
1315
        self.failUnless(t1.has('b/d'))
 
1316
        self.failUnless(t2.has('d'))
 
1317
        self.failUnless(t3.has('b/d'))
1322
1318
 
1323
1319
    def test_clone_to_root(self):
1324
1320
        orig_transport = self.get_transport()
1398
1394
        self.assertEqual(transport.clone("/").abspath('foo'),
1399
1395
                         transport.abspath("/foo"))
1400
1396
 
1401
 
    # GZ 2011-01-26: Test in per_transport but not using self.get_transport?
1402
1397
    def test_win32_abspath(self):
1403
1398
        # Note: we tried to set sys.platform='win32' so we could test on
1404
1399
        # other platforms too, but then osutils does platform specific
1409
1404
 
1410
1405
        # smoke test for abspath on win32.
1411
1406
        # a transport based on 'file:///' never fully qualifies the drive.
1412
 
        transport = _mod_transport.get_transport("file:///")
1413
 
        self.assertEqual(transport.abspath("/"), "file:///")
 
1407
        transport = get_transport("file:///")
 
1408
        self.failUnlessEqual(transport.abspath("/"), "file:///")
1414
1409
 
1415
1410
        # but a transport that starts with a drive spec must keep it.
1416
 
        transport = _mod_transport.get_transport("file:///C:/")
1417
 
        self.assertEqual(transport.abspath("/"), "file:///C:/")
 
1411
        transport = get_transport("file:///C:/")
 
1412
        self.failUnlessEqual(transport.abspath("/"), "file:///C:/")
1418
1413
 
1419
1414
    def test_local_abspath(self):
1420
1415
        transport = self.get_transport()