~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Patch Queue Manager
  • Date: 2013-05-20 17:46:29 UTC
  • mfrom: (6573.1.1 bzr)
  • Revision ID: pqm@pqm.ubuntu.com-20130520174629-dp7zujtuclvomuzd
(jameinel) Fix CVE 2013-2009. Avoid allowing multiple wildcards in a single
 SSL cert hostname segment. (Andrew Starr-Bochicchio)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
import errno
21
21
import os
22
22
import re
 
23
import select
23
24
import socket
24
25
import sys
25
26
import time
38
39
    file_utils,
39
40
    test__walkdirs_win32,
40
41
    )
41
 
 
42
 
 
43
 
class _UTF8DirReaderFeature(tests.Feature):
 
42
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
43
 
 
44
 
 
45
class _UTF8DirReaderFeature(features.Feature):
44
46
 
45
47
    def _probe(self):
46
48
        try:
53
55
    def feature_name(self):
54
56
        return 'bzrlib._readdir_pyx'
55
57
 
56
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
 
58
UTF8DirReaderFeature = features.ModuleAvailableFeature('bzrlib._readdir_pyx')
57
59
 
58
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
60
term_ios_feature = features.ModuleAvailableFeature('termios')
59
61
 
60
62
 
61
63
def _already_unicode(s):
96
98
    return scenarios
97
99
 
98
100
 
99
 
def load_tests(basic_tests, module, loader):
100
 
    suite = loader.suiteClass()
101
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
102
 
        basic_tests, tests.condition_isinstance(TestDirReader))
103
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
104
 
    suite.addTest(remaining_tests)
105
 
    return suite
 
101
load_tests = load_tests_apply_scenarios
106
102
 
107
103
 
108
104
class TestContainsWhitespace(tests.TestCase):
109
105
 
110
106
    def test_contains_whitespace(self):
111
 
        self.failUnless(osutils.contains_whitespace(u' '))
112
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
116
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
107
        self.assertTrue(osutils.contains_whitespace(u' '))
 
108
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
109
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
110
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
111
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
112
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
117
113
 
118
114
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
119
115
        # is whitespace, but we do not.
120
 
        self.failIf(osutils.contains_whitespace(u''))
121
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
122
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
116
        self.assertFalse(osutils.contains_whitespace(u''))
 
117
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
118
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
123
119
 
124
120
 
125
121
class TestRename(tests.TestCaseInTempDir):
139
135
        # This should work everywhere
140
136
        self.create_file('a', 'something in a\n')
141
137
        self._fancy_rename('a', 'b')
142
 
        self.failIfExists('a')
143
 
        self.failUnlessExists('b')
 
138
        self.assertPathDoesNotExist('a')
 
139
        self.assertPathExists('b')
144
140
        self.check_file_contents('b', 'something in a\n')
145
141
 
146
142
        self.create_file('a', 'new something in a\n')
153
149
        self.create_file('target', 'data in target\n')
154
150
        self.assertRaises((IOError, OSError), self._fancy_rename,
155
151
                          'missingsource', 'target')
156
 
        self.failUnlessExists('target')
 
152
        self.assertPathExists('target')
157
153
        self.check_file_contents('target', 'data in target\n')
158
154
 
159
155
    def test_fancy_rename_fails_if_source_and_target_missing(self):
164
160
        # Rename should be semi-atomic on all platforms
165
161
        self.create_file('a', 'something in a\n')
166
162
        osutils.rename('a', 'b')
167
 
        self.failIfExists('a')
168
 
        self.failUnlessExists('b')
 
163
        self.assertPathDoesNotExist('a')
 
164
        self.assertPathExists('b')
169
165
        self.check_file_contents('b', 'something in a\n')
170
166
 
171
167
        self.create_file('a', 'new something in a\n')
185
181
        shape = sorted(os.listdir('.'))
186
182
        self.assertEquals(['A', 'B'], shape)
187
183
 
 
184
    def test_rename_exception(self):
 
185
        try:
 
186
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
187
        except OSError, e:
 
188
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
189
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
190
            self.assertTrue('nonexistent_path' in e.strerror)
 
191
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
192
 
188
193
 
189
194
class TestRandChars(tests.TestCase):
190
195
 
236
241
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
237
242
 
238
243
 
 
244
class TestLstat(tests.TestCaseInTempDir):
 
245
 
 
246
    def test_lstat_matches_fstat(self):
 
247
        # On Windows, lstat and fstat don't always agree, primarily in the
 
248
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
249
        # custom implementation.
 
250
        if sys.platform == 'win32':
 
251
            # We only have special lstat/fstat if we have the extension.
 
252
            # Without it, we may end up re-reading content when we don't have
 
253
            # to, but otherwise it doesn't effect correctness.
 
254
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
255
        f = open('test-file.txt', 'wb')
 
256
        self.addCleanup(f.close)
 
257
        f.write('some content\n')
 
258
        f.flush()
 
259
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
260
                             osutils.lstat('test-file.txt'))
 
261
 
 
262
 
239
263
class TestRmTree(tests.TestCaseInTempDir):
240
264
 
241
265
    def test_rmtree(self):
253
277
 
254
278
        osutils.rmtree('dir')
255
279
 
256
 
        self.failIfExists('dir/file')
257
 
        self.failIfExists('dir')
 
280
        self.assertPathDoesNotExist('dir/file')
 
281
        self.assertPathDoesNotExist('dir')
258
282
 
259
283
 
260
284
class TestDeleteAny(tests.TestCaseInTempDir):
415
439
class TestLinks(tests.TestCaseInTempDir):
416
440
 
417
441
    def test_dereference_path(self):
418
 
        self.requireFeature(tests.SymlinkFeature)
 
442
        self.requireFeature(features.SymlinkFeature)
419
443
        cwd = osutils.realpath('.')
420
444
        os.mkdir('bar')
421
445
        bar_path = osutils.pathjoin(cwd, 'bar')
468
492
 
469
493
class TestCanonicalRelPath(tests.TestCaseInTempDir):
470
494
 
471
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
495
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
472
496
 
473
497
    def test_canonical_relpath_simple(self):
474
498
        f = file('MixedCaseName', 'w')
475
499
        f.close()
476
500
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
477
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
501
        self.assertEqual('work/MixedCaseName', actual)
478
502
 
479
503
    def test_canonical_relpath_missing_tail(self):
480
504
        os.mkdir('MixedCaseParent')
481
505
        actual = osutils.canonical_relpath(self.test_base_dir,
482
506
                                           'mixedcaseparent/nochild')
483
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
507
        self.assertEqual('work/MixedCaseParent/nochild', actual)
484
508
 
485
509
 
486
510
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
530
554
    """Test pumpfile method."""
531
555
 
532
556
    def setUp(self):
533
 
        tests.TestCase.setUp(self)
 
557
        super(TestPumpFile, self).setUp()
534
558
        # create a test datablock
535
559
        self.block_size = 512
536
560
        pattern = '0123456789ABCDEF'
804
828
        self.assertEqual(None, osutils.safe_file_id(None))
805
829
 
806
830
 
 
831
class TestSendAll(tests.TestCase):
 
832
 
 
833
    def test_send_with_disconnected_socket(self):
 
834
        class DisconnectedSocket(object):
 
835
            def __init__(self, err):
 
836
                self.err = err
 
837
            def send(self, content):
 
838
                raise self.err
 
839
            def close(self):
 
840
                pass
 
841
        # All of these should be treated as ConnectionReset
 
842
        errs = []
 
843
        for err_cls in (IOError, socket.error):
 
844
            for errnum in osutils._end_of_stream_errors:
 
845
                errs.append(err_cls(errnum))
 
846
        for err in errs:
 
847
            sock = DisconnectedSocket(err)
 
848
            self.assertRaises(errors.ConnectionReset,
 
849
                osutils.send_all, sock, 'some more content')
 
850
 
 
851
    def test_send_with_no_progress(self):
 
852
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
853
        # It seems that paramiko can get into a state where it doesn't error,
 
854
        # but it returns 0 bytes sent for requests over and over again.
 
855
        class NoSendingSocket(object):
 
856
            def __init__(self):
 
857
                self.call_count = 0
 
858
            def send(self, bytes):
 
859
                self.call_count += 1
 
860
                if self.call_count > 100:
 
861
                    # Prevent the test suite from hanging
 
862
                    raise RuntimeError('too many calls')
 
863
                return 0
 
864
        sock = NoSendingSocket()
 
865
        self.assertRaises(errors.ConnectionReset,
 
866
                          osutils.send_all, sock, 'content')
 
867
        self.assertEqual(1, sock.call_count)
 
868
 
 
869
 
 
870
class TestPosixFuncs(tests.TestCase):
 
871
    """Test that the posix version of normpath returns an appropriate path
 
872
       when used with 2 leading slashes."""
 
873
 
 
874
    def test_normpath(self):
 
875
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
876
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
877
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
878
 
 
879
 
807
880
class TestWin32Funcs(tests.TestCase):
808
881
    """Test that _win32 versions of os utilities return appropriate paths."""
809
882
 
874
947
    """Test win32 functions that create files."""
875
948
 
876
949
    def test_getcwd(self):
877
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
950
        self.requireFeature(features.UnicodeFilenameFeature)
878
951
        os.mkdir(u'mu-\xb5')
879
952
        os.chdir(u'mu-\xb5')
880
953
        # TODO: jam 20060427 This will probably fail on Mac OSX because
910
983
        b.close()
911
984
 
912
985
        osutils._win32_rename('b', 'a')
913
 
        self.failUnlessExists('a')
914
 
        self.failIfExists('b')
 
986
        self.assertPathExists('a')
 
987
        self.assertPathDoesNotExist('b')
915
988
        self.assertFileEqual('baz\n', 'a')
916
989
 
917
990
    def test_rename_missing_file(self):
970
1043
    """Test mac special functions that require directories."""
971
1044
 
972
1045
    def test_getcwd(self):
973
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1046
        self.requireFeature(features.UnicodeFilenameFeature)
974
1047
        os.mkdir(u'B\xe5gfors')
975
1048
        os.chdir(u'B\xe5gfors')
976
1049
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
977
1050
 
978
1051
    def test_getcwd_nonnorm(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1052
        self.requireFeature(features.UnicodeFilenameFeature)
980
1053
        # Test that _mac_getcwd() will normalize this path
981
1054
        os.mkdir(u'Ba\u030agfors')
982
1055
        os.chdir(u'Ba\u030agfors')
1092
1165
        # are not using the filesystem's encoding
1093
1166
 
1094
1167
        # require a bytestring based filesystem
1095
 
        self.requireFeature(tests.ByteStringNamedFilesystem)
 
1168
        self.requireFeature(features.ByteStringNamedFilesystem)
1096
1169
 
1097
1170
        tree = [
1098
1171
            '.bzr',
1107
1180
 
1108
1181
        # rename the 1file to a latin-1 filename
1109
1182
        os.rename("./1file", "\xe8file")
 
1183
        if "\xe8file" not in os.listdir("."):
 
1184
            self.skip("Lack filesystem that preserves arbitrary bytes")
1110
1185
 
1111
1186
        self._save_platform_info()
1112
1187
        win32utils.winver = None # Avoid the win32 detection code
1190
1265
        self.requireFeature(UTF8DirReaderFeature)
1191
1266
        self._save_platform_info()
1192
1267
        win32utils.winver = None # Avoid the win32 detection code
1193
 
        osutils._fs_enc = 'UTF-8'
1194
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1268
        osutils._fs_enc = 'utf-8'
 
1269
        self.assertDirReaderIs(
 
1270
            UTF8DirReaderFeature.module.UTF8DirReader)
1195
1271
 
1196
1272
    def test_force_walkdirs_utf8_fs_ascii(self):
1197
1273
        self.requireFeature(UTF8DirReaderFeature)
1198
1274
        self._save_platform_info()
1199
1275
        win32utils.winver = None # Avoid the win32 detection code
1200
 
        osutils._fs_enc = 'US-ASCII'
1201
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1202
 
 
1203
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1204
 
        self.requireFeature(UTF8DirReaderFeature)
1205
 
        self._save_platform_info()
1206
 
        win32utils.winver = None # Avoid the win32 detection code
1207
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1208
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1276
        osutils._fs_enc = 'ascii'
 
1277
        self.assertDirReaderIs(
 
1278
            UTF8DirReaderFeature.module.UTF8DirReader)
1209
1279
 
1210
1280
    def test_force_walkdirs_utf8_fs_latin1(self):
1211
1281
        self._save_platform_info()
1212
1282
        win32utils.winver = None # Avoid the win32 detection code
1213
 
        osutils._fs_enc = 'latin1'
 
1283
        osutils._fs_enc = 'iso-8859-1'
1214
1284
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1215
1285
 
1216
1286
    def test_force_walkdirs_utf8_nt(self):
1229
1299
 
1230
1300
    def test_unicode_walkdirs(self):
1231
1301
        """Walkdirs should always return unicode paths."""
1232
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1302
        self.requireFeature(features.UnicodeFilenameFeature)
1233
1303
        name0 = u'0file-\xb6'
1234
1304
        name1 = u'1dir-\u062c\u0648'
1235
1305
        name2 = u'2file-\u0633'
1272
1342
 
1273
1343
        The abspath portion might be in unicode or utf-8
1274
1344
        """
1275
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1345
        self.requireFeature(features.UnicodeFilenameFeature)
1276
1346
        name0 = u'0file-\xb6'
1277
1347
        name1 = u'1dir-\u062c\u0648'
1278
1348
        name2 = u'2file-\u0633'
1333
1403
 
1334
1404
        The abspath portion should be in unicode
1335
1405
        """
1336
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1406
        self.requireFeature(features.UnicodeFilenameFeature)
1337
1407
        # Use the unicode reader. TODO: split into driver-and-driven unit
1338
1408
        # tests.
1339
1409
        self._save_platform_info()
1380
1450
 
1381
1451
    def test__walkdirs_utf8_win32readdir(self):
1382
1452
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1383
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1453
        self.requireFeature(features.UnicodeFilenameFeature)
1384
1454
        from bzrlib._walkdirs_win32 import Win32ReadDir
1385
1455
        self._save_platform_info()
1386
1456
        osutils._selected_dir_reader = Win32ReadDir()
1437
1507
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1438
1508
        """make sure our Stat values are valid"""
1439
1509
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1440
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1510
        self.requireFeature(features.UnicodeFilenameFeature)
1441
1511
        from bzrlib._walkdirs_win32 import Win32ReadDir
1442
1512
        name0u = u'0file-\xb6'
1443
1513
        name0 = name0u.encode('utf8')
1461
1531
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1462
1532
        """make sure our Stat values are valid"""
1463
1533
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1464
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1534
        self.requireFeature(features.UnicodeFilenameFeature)
1465
1535
        from bzrlib._walkdirs_win32 import Win32ReadDir
1466
1536
        name0u = u'0dir-\u062c\u0648'
1467
1537
        name0 = name0u.encode('utf8')
1567
1637
        self.assertEqual(['c'], os.listdir('target/b'))
1568
1638
 
1569
1639
    def test_copy_tree_symlinks(self):
1570
 
        self.requireFeature(tests.SymlinkFeature)
 
1640
        self.requireFeature(features.SymlinkFeature)
1571
1641
        self.build_tree(['source/'])
1572
1642
        os.symlink('a/generic/path', 'source/lnk')
1573
1643
        osutils.copy_tree('source', 'target')
1598
1668
                          ('d', 'source/b', 'target/b'),
1599
1669
                          ('f', 'source/b/c', 'target/b/c'),
1600
1670
                         ], processed_files)
1601
 
        self.failIfExists('target')
 
1671
        self.assertPathDoesNotExist('target')
1602
1672
        if osutils.has_symlinks():
1603
1673
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1604
1674
 
1650
1720
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1651
1721
        self.assertEqual('foo', old)
1652
1722
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1653
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1723
        self.assertFalse('BZR_TEST_ENV_VAR' in os.environ)
1654
1724
 
1655
1725
 
1656
1726
class TestSizeShaFile(tests.TestCaseInTempDir):
1733
1803
 
1734
1804
class TestDirReader(tests.TestCaseInTempDir):
1735
1805
 
 
1806
    scenarios = dir_reader_scenarios()
 
1807
 
1736
1808
    # Set by load_tests
1737
1809
    _dir_reader_class = None
1738
1810
    _native_to_unicode = None
1739
1811
 
1740
1812
    def setUp(self):
1741
 
        tests.TestCaseInTempDir.setUp(self)
 
1813
        super(TestDirReader, self).setUp()
1742
1814
        self.overrideAttr(osutils,
1743
1815
                          '_selected_dir_reader', self._dir_reader_class())
1744
1816
 
1838
1910
        return filtered_dirblocks
1839
1911
 
1840
1912
    def test_walk_unicode_tree(self):
1841
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1913
        self.requireFeature(features.UnicodeFilenameFeature)
1842
1914
        tree, expected_dirblocks = self._get_unicode_tree()
1843
1915
        self.build_tree(tree)
1844
1916
        result = list(osutils._walkdirs_utf8('.'))
1845
1917
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1846
1918
 
1847
1919
    def test_symlink(self):
1848
 
        self.requireFeature(tests.SymlinkFeature)
1849
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1920
        self.requireFeature(features.SymlinkFeature)
 
1921
        self.requireFeature(features.UnicodeFilenameFeature)
1850
1922
        target = u'target\N{Euro Sign}'
1851
1923
        link_name = u'l\N{Euro Sign}nk'
1852
1924
        os.symlink(target, link_name)
1870
1942
    But prior python versions failed to properly encode the passed unicode
1871
1943
    string.
1872
1944
    """
1873
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1945
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1874
1946
 
1875
1947
    def setUp(self):
1876
1948
        super(tests.TestCaseInTempDir, self).setUp()
1879
1951
        os.symlink(self.target, self.link)
1880
1952
 
1881
1953
    def test_os_readlink_link_encoding(self):
1882
 
        if sys.version_info < (2, 6):
1883
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1884
 
        else:
1885
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
1954
        self.assertEquals(self.target,  os.readlink(self.link))
1886
1955
 
1887
1956
    def test_os_readlink_link_decoding(self):
1888
1957
        self.assertEquals(self.target.encode(osutils._fs_enc),
1900
1969
        self.assertIsInstance(concurrency, int)
1901
1970
 
1902
1971
    def test_local_concurrency_environment_variable(self):
1903
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
1972
        self.overrideEnv('BZR_CONCURRENCY', '2')
1904
1973
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1905
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
1974
        self.overrideEnv('BZR_CONCURRENCY', '3')
1906
1975
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1907
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
1976
        self.overrideEnv('BZR_CONCURRENCY', 'foo')
1908
1977
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1909
1978
 
1910
1979
    def test_option_concurrency(self):
1911
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
1980
        self.overrideEnv('BZR_CONCURRENCY', '1')
1912
1981
        self.run_bzr('rocks --concurrency 42')
1913
 
        # Command line overrides envrionment variable
 
1982
        # Command line overrides environment variable
1914
1983
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1915
1984
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1916
1985
 
1955
2024
class TestTerminalWidth(tests.TestCase):
1956
2025
 
1957
2026
    def setUp(self):
1958
 
        tests.TestCase.setUp(self)
 
2027
        super(TestTerminalWidth, self).setUp()
1959
2028
        self._orig_terminal_size_state = osutils._terminal_size_state
1960
2029
        self._orig_first_terminal_size = osutils._first_terminal_size
1961
2030
        self.addCleanup(self.restore_osutils_globals)
1986
2055
    def test_defaults_to_BZR_COLUMNS(self):
1987
2056
        # BZR_COLUMNS is set by the test framework
1988
2057
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1989
 
        os.environ['BZR_COLUMNS'] = '12'
 
2058
        self.overrideEnv('BZR_COLUMNS', '12')
1990
2059
        self.assertEqual(12, osutils.terminal_width())
1991
2060
 
 
2061
    def test_BZR_COLUMNS_0_no_limit(self):
 
2062
        self.overrideEnv('BZR_COLUMNS', '0')
 
2063
        self.assertEqual(None, osutils.terminal_width())
 
2064
 
1992
2065
    def test_falls_back_to_COLUMNS(self):
1993
 
        del os.environ['BZR_COLUMNS']
 
2066
        self.overrideEnv('BZR_COLUMNS', None)
1994
2067
        self.assertNotEqual('42', os.environ['COLUMNS'])
1995
2068
        self.set_fake_tty()
1996
 
        os.environ['COLUMNS'] = '42'
 
2069
        self.overrideEnv('COLUMNS', '42')
1997
2070
        self.assertEqual(42, osutils.terminal_width())
1998
2071
 
1999
2072
    def test_tty_default_without_columns(self):
2000
 
        del os.environ['BZR_COLUMNS']
2001
 
        del os.environ['COLUMNS']
 
2073
        self.overrideEnv('BZR_COLUMNS', None)
 
2074
        self.overrideEnv('COLUMNS', None)
2002
2075
 
2003
2076
        def terminal_size(w, h):
2004
2077
            return 42, 42
2011
2084
        self.assertEqual(42, osutils.terminal_width())
2012
2085
 
2013
2086
    def test_non_tty_default_without_columns(self):
2014
 
        del os.environ['BZR_COLUMNS']
2015
 
        del os.environ['COLUMNS']
 
2087
        self.overrideEnv('BZR_COLUMNS', None)
 
2088
        self.overrideEnv('COLUMNS', None)
2016
2089
        self.replace_stdout(None)
2017
2090
        self.assertEqual(None, osutils.terminal_width())
2018
2091
 
2028
2101
        else:
2029
2102
            self.overrideAttr(termios, 'TIOCGWINSZ')
2030
2103
            del termios.TIOCGWINSZ
2031
 
        del os.environ['BZR_COLUMNS']
2032
 
        del os.environ['COLUMNS']
 
2104
        self.overrideEnv('BZR_COLUMNS', None)
 
2105
        self.overrideEnv('COLUMNS', None)
2033
2106
        # Whatever the result is, if we don't raise an exception, it's ok.
2034
2107
        osutils.terminal_width()
2035
2108
 
 
2109
 
2036
2110
class TestCreationOps(tests.TestCaseInTempDir):
2037
2111
    _test_needs_features = [features.chown_feature]
2038
2112
 
2039
2113
    def setUp(self):
2040
 
        tests.TestCaseInTempDir.setUp(self)
 
2114
        super(TestCreationOps, self).setUp()
2041
2115
        self.overrideAttr(os, 'chown', self._dummy_chown)
2042
2116
 
2043
2117
        # params set by call to _dummy_chown
2068
2142
        self.assertEquals(self.uid, s.st_uid)
2069
2143
        self.assertEquals(self.gid, s.st_gid)
2070
2144
 
 
2145
 
 
2146
class TestPathFromEnviron(tests.TestCase):
 
2147
 
 
2148
    def test_is_unicode(self):
 
2149
        self.overrideEnv('BZR_TEST_PATH', './anywhere at all/')
 
2150
        path = osutils.path_from_environ('BZR_TEST_PATH')
 
2151
        self.assertIsInstance(path, unicode)
 
2152
        self.assertEqual(u'./anywhere at all/', path)
 
2153
 
 
2154
    def test_posix_path_env_ascii(self):
 
2155
        self.overrideEnv('BZR_TEST_PATH', '/tmp')
 
2156
        home = osutils._posix_path_from_environ('BZR_TEST_PATH')
 
2157
        self.assertIsInstance(home, unicode)
 
2158
        self.assertEqual(u'/tmp', home)
 
2159
 
 
2160
    def test_posix_path_env_unicode(self):
 
2161
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2162
        self.overrideEnv('BZR_TEST_PATH', '/home/\xa7test')
 
2163
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2164
        self.assertEqual(u'/home/\xa7test',
 
2165
            osutils._posix_path_from_environ('BZR_TEST_PATH'))
 
2166
        osutils._fs_enc = "iso8859-5"
 
2167
        self.assertEqual(u'/home/\u0407test',
 
2168
            osutils._posix_path_from_environ('BZR_TEST_PATH'))
 
2169
        osutils._fs_enc = "utf-8"
 
2170
        self.assertRaises(errors.BadFilenameEncoding,
 
2171
            osutils._posix_path_from_environ, 'BZR_TEST_PATH')
 
2172
 
 
2173
 
 
2174
class TestGetHomeDir(tests.TestCase):
 
2175
 
 
2176
    def test_is_unicode(self):
 
2177
        home = osutils._get_home_dir()
 
2178
        self.assertIsInstance(home, unicode)
 
2179
 
 
2180
    def test_posix_homeless(self):
 
2181
        self.overrideEnv('HOME', None)
 
2182
        home = osutils._get_home_dir()
 
2183
        self.assertIsInstance(home, unicode)
 
2184
 
 
2185
    def test_posix_home_ascii(self):
 
2186
        self.overrideEnv('HOME', '/home/test')
 
2187
        home = osutils._posix_get_home_dir()
 
2188
        self.assertIsInstance(home, unicode)
 
2189
        self.assertEqual(u'/home/test', home)
 
2190
 
 
2191
    def test_posix_home_unicode(self):
 
2192
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2193
        self.overrideEnv('HOME', '/home/\xa7test')
 
2194
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2195
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2196
        osutils._fs_enc = "iso8859-5"
 
2197
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2198
        osutils._fs_enc = "utf-8"
 
2199
        self.assertRaises(errors.BadFilenameEncoding,
 
2200
            osutils._posix_get_home_dir)
 
2201
 
 
2202
 
2071
2203
class TestGetuserUnicode(tests.TestCase):
2072
2204
 
 
2205
    def test_is_unicode(self):
 
2206
        user = osutils.getuser_unicode()
 
2207
        self.assertIsInstance(user, unicode)
 
2208
 
 
2209
    def envvar_to_override(self):
 
2210
        if sys.platform == "win32":
 
2211
            # Disable use of platform calls on windows so envvar is used
 
2212
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2213
            return 'USERNAME' # only variable used on windows
 
2214
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2215
 
2073
2216
    def test_ascii_user(self):
2074
 
        osutils.set_or_unset_env('LOGNAME', 'jrandom')
 
2217
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
2075
2218
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
2076
2219
 
2077
2220
    def test_unicode_user(self):
2083
2226
                % (osutils.get_user_encoding(),))
2084
2227
        uni_username = u'jrandom' + uni_val
2085
2228
        encoded_username = uni_username.encode(ue)
2086
 
        osutils.set_or_unset_env('LOGNAME', encoded_username)
 
2229
        self.overrideEnv(self.envvar_to_override(), encoded_username)
2087
2230
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2231
 
 
2232
 
 
2233
class TestBackupNames(tests.TestCase):
 
2234
 
 
2235
    def setUp(self):
 
2236
        super(TestBackupNames, self).setUp()
 
2237
        self.backups = []
 
2238
 
 
2239
    def backup_exists(self, name):
 
2240
        return name in self.backups
 
2241
 
 
2242
    def available_backup_name(self, name):
 
2243
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2244
        self.backups.append(backup_name)
 
2245
        return backup_name
 
2246
 
 
2247
    def assertBackupName(self, expected, name):
 
2248
        self.assertEqual(expected, self.available_backup_name(name))
 
2249
 
 
2250
    def test_empty(self):
 
2251
        self.assertBackupName('file.~1~', 'file')
 
2252
 
 
2253
    def test_existing(self):
 
2254
        self.available_backup_name('file')
 
2255
        self.available_backup_name('file')
 
2256
        self.assertBackupName('file.~3~', 'file')
 
2257
        # Empty slots are found, this is not a strict requirement and may be
 
2258
        # revisited if we test against all implementations.
 
2259
        self.backups.remove('file.~2~')
 
2260
        self.assertBackupName('file.~2~', 'file')
 
2261
 
 
2262
 
 
2263
class TestFindExecutableInPath(tests.TestCase):
 
2264
 
 
2265
    def test_windows(self):
 
2266
        if sys.platform != 'win32':
 
2267
            raise tests.TestSkipped('test requires win32')
 
2268
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2269
        self.assertTrue(
 
2270
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2271
        self.assertTrue(
 
2272
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2273
        self.assertTrue(
 
2274
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2275
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2276
        
 
2277
    def test_windows_app_path(self):
 
2278
        if sys.platform != 'win32':
 
2279
            raise tests.TestSkipped('test requires win32')
 
2280
        # Override PATH env var so that exe can only be found on App Path
 
2281
        self.overrideEnv('PATH', '')
 
2282
        # Internt Explorer is always registered in the App Path
 
2283
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2284
 
 
2285
    def test_other(self):
 
2286
        if sys.platform == 'win32':
 
2287
            raise tests.TestSkipped('test requires non-win32')
 
2288
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2289
        self.assertTrue(
 
2290
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2291
 
 
2292
 
 
2293
class TestEnvironmentErrors(tests.TestCase):
 
2294
    """Test handling of environmental errors"""
 
2295
 
 
2296
    def test_is_oserror(self):
 
2297
        self.assertTrue(osutils.is_environment_error(
 
2298
            OSError(errno.EINVAL, "Invalid parameter")))
 
2299
 
 
2300
    def test_is_ioerror(self):
 
2301
        self.assertTrue(osutils.is_environment_error(
 
2302
            IOError(errno.EINVAL, "Invalid parameter")))
 
2303
 
 
2304
    def test_is_socket_error(self):
 
2305
        self.assertTrue(osutils.is_environment_error(
 
2306
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2307
 
 
2308
    def test_is_select_error(self):
 
2309
        self.assertTrue(osutils.is_environment_error(
 
2310
            select.error(errno.EINVAL, "Invalid parameter")))
 
2311
 
 
2312
    def test_is_pywintypes_error(self):
 
2313
        self.requireFeature(features.pywintypes)
 
2314
        import pywintypes
 
2315
        self.assertTrue(osutils.is_environment_error(
 
2316
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))