~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Kit Randel
  • Date: 2014-12-15 20:24:42 UTC
  • mto: This revision was merged to the branch mainline in revision 6602.
  • Revision ID: kit.randel@canonical.com-20141215202442-usf2ixhypqg8yh6q
added a note for bug-1400567 to the 2.7b release notes

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
20
20
import errno
21
21
import os
22
22
import re
 
23
import select
23
24
import socket
24
25
import sys
 
26
import tempfile
25
27
import time
26
28
 
27
29
from bzrlib import (
38
40
    file_utils,
39
41
    test__walkdirs_win32,
40
42
    )
41
 
 
42
 
 
43
 
class _UTF8DirReaderFeature(tests.Feature):
 
43
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
44
 
 
45
 
 
46
class _UTF8DirReaderFeature(features.Feature):
44
47
 
45
48
    def _probe(self):
46
49
        try:
53
56
    def feature_name(self):
54
57
        return 'bzrlib._readdir_pyx'
55
58
 
56
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
 
59
UTF8DirReaderFeature = features.ModuleAvailableFeature('bzrlib._readdir_pyx')
57
60
 
58
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
 
61
term_ios_feature = features.ModuleAvailableFeature('termios')
59
62
 
60
63
 
61
64
def _already_unicode(s):
96
99
    return scenarios
97
100
 
98
101
 
99
 
def load_tests(basic_tests, module, loader):
100
 
    suite = loader.suiteClass()
101
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
102
 
        basic_tests, tests.condition_isinstance(TestDirReader))
103
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
104
 
    suite.addTest(remaining_tests)
105
 
    return suite
 
102
load_tests = load_tests_apply_scenarios
106
103
 
107
104
 
108
105
class TestContainsWhitespace(tests.TestCase):
109
106
 
110
107
    def test_contains_whitespace(self):
111
 
        self.failUnless(osutils.contains_whitespace(u' '))
112
 
        self.failUnless(osutils.contains_whitespace(u'hello there'))
113
 
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
114
 
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
115
 
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
116
 
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
108
        self.assertTrue(osutils.contains_whitespace(u' '))
 
109
        self.assertTrue(osutils.contains_whitespace(u'hello there'))
 
110
        self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
 
111
        self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
 
112
        self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
 
113
        self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
117
114
 
118
115
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
119
116
        # is whitespace, but we do not.
120
 
        self.failIf(osutils.contains_whitespace(u''))
121
 
        self.failIf(osutils.contains_whitespace(u'hellothere'))
122
 
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
117
        self.assertFalse(osutils.contains_whitespace(u''))
 
118
        self.assertFalse(osutils.contains_whitespace(u'hellothere'))
 
119
        self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
123
120
 
124
121
 
125
122
class TestRename(tests.TestCaseInTempDir):
139
136
        # This should work everywhere
140
137
        self.create_file('a', 'something in a\n')
141
138
        self._fancy_rename('a', 'b')
142
 
        self.failIfExists('a')
143
 
        self.failUnlessExists('b')
 
139
        self.assertPathDoesNotExist('a')
 
140
        self.assertPathExists('b')
144
141
        self.check_file_contents('b', 'something in a\n')
145
142
 
146
143
        self.create_file('a', 'new something in a\n')
153
150
        self.create_file('target', 'data in target\n')
154
151
        self.assertRaises((IOError, OSError), self._fancy_rename,
155
152
                          'missingsource', 'target')
156
 
        self.failUnlessExists('target')
 
153
        self.assertPathExists('target')
157
154
        self.check_file_contents('target', 'data in target\n')
158
155
 
159
156
    def test_fancy_rename_fails_if_source_and_target_missing(self):
164
161
        # Rename should be semi-atomic on all platforms
165
162
        self.create_file('a', 'something in a\n')
166
163
        osutils.rename('a', 'b')
167
 
        self.failIfExists('a')
168
 
        self.failUnlessExists('b')
 
164
        self.assertPathDoesNotExist('a')
 
165
        self.assertPathExists('b')
169
166
        self.check_file_contents('b', 'something in a\n')
170
167
 
171
168
        self.create_file('a', 'new something in a\n')
185
182
        shape = sorted(os.listdir('.'))
186
183
        self.assertEquals(['A', 'B'], shape)
187
184
 
 
185
    def test_rename_exception(self):
 
186
        try:
 
187
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
 
188
        except OSError, e:
 
189
            self.assertEqual(e.old_filename, 'nonexistent_path')
 
190
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
 
191
            self.assertTrue('nonexistent_path' in e.strerror)
 
192
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
193
 
188
194
 
189
195
class TestRandChars(tests.TestCase):
190
196
 
236
242
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
237
243
 
238
244
 
 
245
class TestLstat(tests.TestCaseInTempDir):
 
246
 
 
247
    def test_lstat_matches_fstat(self):
 
248
        # On Windows, lstat and fstat don't always agree, primarily in the
 
249
        # 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
 
250
        # custom implementation.
 
251
        if sys.platform == 'win32':
 
252
            # We only have special lstat/fstat if we have the extension.
 
253
            # Without it, we may end up re-reading content when we don't have
 
254
            # to, but otherwise it doesn't effect correctness.
 
255
            self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
256
        f = open('test-file.txt', 'wb')
 
257
        self.addCleanup(f.close)
 
258
        f.write('some content\n')
 
259
        f.flush()
 
260
        self.assertEqualStat(osutils.fstat(f.fileno()),
 
261
                             osutils.lstat('test-file.txt'))
 
262
 
 
263
 
239
264
class TestRmTree(tests.TestCaseInTempDir):
240
265
 
241
266
    def test_rmtree(self):
253
278
 
254
279
        osutils.rmtree('dir')
255
280
 
256
 
        self.failIfExists('dir/file')
257
 
        self.failIfExists('dir')
 
281
        self.assertPathDoesNotExist('dir/file')
 
282
        self.assertPathDoesNotExist('dir')
258
283
 
259
284
 
260
285
class TestDeleteAny(tests.TestCaseInTempDir):
412
437
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
413
438
 
414
439
 
 
440
class TestFdatasync(tests.TestCaseInTempDir):
 
441
 
 
442
    def do_fdatasync(self):
 
443
        f = tempfile.NamedTemporaryFile()
 
444
        osutils.fdatasync(f.fileno())
 
445
        f.close()
 
446
 
 
447
    @staticmethod
 
448
    def raise_eopnotsupp(*args, **kwargs):
 
449
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
 
450
 
 
451
    @staticmethod
 
452
    def raise_enotsup(*args, **kwargs):
 
453
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
 
454
 
 
455
    def test_fdatasync_handles_system_function(self):
 
456
        self.overrideAttr(os, "fdatasync")
 
457
        self.do_fdatasync()
 
458
 
 
459
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
 
460
        self.overrideAttr(os, "fdatasync")
 
461
        self.overrideAttr(os, "fsync")
 
462
        self.do_fdatasync()
 
463
 
 
464
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
 
465
        self.overrideAttr(errno, "EOPNOTSUPP")
 
466
        self.do_fdatasync()
 
467
 
 
468
    def test_fdatasync_catches_ENOTSUP(self):
 
469
        enotsup = getattr(errno, "ENOTSUP", None)
 
470
        if enotsup is None:
 
471
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
 
472
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
 
473
        self.do_fdatasync()
 
474
 
 
475
    def test_fdatasync_catches_EOPNOTSUPP(self):
 
476
        enotsup = getattr(errno, "EOPNOTSUPP", None)
 
477
        if enotsup is None:
 
478
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
 
479
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
 
480
        self.do_fdatasync()
 
481
 
 
482
 
415
483
class TestLinks(tests.TestCaseInTempDir):
416
484
 
417
485
    def test_dereference_path(self):
418
 
        self.requireFeature(tests.SymlinkFeature)
 
486
        self.requireFeature(features.SymlinkFeature)
419
487
        cwd = osutils.realpath('.')
420
488
        os.mkdir('bar')
421
489
        bar_path = osutils.pathjoin(cwd, 'bar')
468
536
 
469
537
class TestCanonicalRelPath(tests.TestCaseInTempDir):
470
538
 
471
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
539
    _test_needs_features = [features.CaseInsCasePresFilenameFeature]
472
540
 
473
541
    def test_canonical_relpath_simple(self):
474
542
        f = file('MixedCaseName', 'w')
475
543
        f.close()
476
544
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
477
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
545
        self.assertEqual('work/MixedCaseName', actual)
478
546
 
479
547
    def test_canonical_relpath_missing_tail(self):
480
548
        os.mkdir('MixedCaseParent')
481
549
        actual = osutils.canonical_relpath(self.test_base_dir,
482
550
                                           'mixedcaseparent/nochild')
483
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
 
551
        self.assertEqual('work/MixedCaseParent/nochild', actual)
484
552
 
485
553
 
486
554
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
530
598
    """Test pumpfile method."""
531
599
 
532
600
    def setUp(self):
533
 
        tests.TestCase.setUp(self)
 
601
        super(TestPumpFile, self).setUp()
534
602
        # create a test datablock
535
603
        self.block_size = 512
536
604
        pattern = '0123456789ABCDEF'
804
872
        self.assertEqual(None, osutils.safe_file_id(None))
805
873
 
806
874
 
 
875
class TestSendAll(tests.TestCase):
 
876
 
 
877
    def test_send_with_disconnected_socket(self):
 
878
        class DisconnectedSocket(object):
 
879
            def __init__(self, err):
 
880
                self.err = err
 
881
            def send(self, content):
 
882
                raise self.err
 
883
            def close(self):
 
884
                pass
 
885
        # All of these should be treated as ConnectionReset
 
886
        errs = []
 
887
        for err_cls in (IOError, socket.error):
 
888
            for errnum in osutils._end_of_stream_errors:
 
889
                errs.append(err_cls(errnum))
 
890
        for err in errs:
 
891
            sock = DisconnectedSocket(err)
 
892
            self.assertRaises(errors.ConnectionReset,
 
893
                osutils.send_all, sock, 'some more content')
 
894
 
 
895
    def test_send_with_no_progress(self):
 
896
        # See https://bugs.launchpad.net/bzr/+bug/1047309
 
897
        # It seems that paramiko can get into a state where it doesn't error,
 
898
        # but it returns 0 bytes sent for requests over and over again.
 
899
        class NoSendingSocket(object):
 
900
            def __init__(self):
 
901
                self.call_count = 0
 
902
            def send(self, bytes):
 
903
                self.call_count += 1
 
904
                if self.call_count > 100:
 
905
                    # Prevent the test suite from hanging
 
906
                    raise RuntimeError('too many calls')
 
907
                return 0
 
908
        sock = NoSendingSocket()
 
909
        self.assertRaises(errors.ConnectionReset,
 
910
                          osutils.send_all, sock, 'content')
 
911
        self.assertEqual(1, sock.call_count)
 
912
 
 
913
 
 
914
class TestPosixFuncs(tests.TestCase):
 
915
    """Test that the posix version of normpath returns an appropriate path
 
916
       when used with 2 leading slashes."""
 
917
 
 
918
    def test_normpath(self):
 
919
        self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
 
920
        self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
 
921
        self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
 
922
 
 
923
 
807
924
class TestWin32Funcs(tests.TestCase):
808
925
    """Test that _win32 versions of os utilities return appropriate paths."""
809
926
 
826
943
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
827
944
        self.assertEqual('path/to/foo',
828
945
                         osutils._win32_pathjoin('path/to/', 'foo'))
829
 
        self.assertEqual('/foo',
 
946
 
 
947
    def test_pathjoin_late_bugfix(self):
 
948
        if sys.version_info < (2, 7, 6):
 
949
            expected = '/foo'
 
950
        else:
 
951
            expected = 'C:/foo'
 
952
        self.assertEqual(expected,
830
953
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
831
 
        self.assertEqual('/foo',
 
954
        self.assertEqual(expected,
832
955
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
833
956
 
834
957
    def test_normpath(self):
874
997
    """Test win32 functions that create files."""
875
998
 
876
999
    def test_getcwd(self):
877
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1000
        self.requireFeature(features.UnicodeFilenameFeature)
878
1001
        os.mkdir(u'mu-\xb5')
879
1002
        os.chdir(u'mu-\xb5')
880
1003
        # TODO: jam 20060427 This will probably fail on Mac OSX because
910
1033
        b.close()
911
1034
 
912
1035
        osutils._win32_rename('b', 'a')
913
 
        self.failUnlessExists('a')
914
 
        self.failIfExists('b')
 
1036
        self.assertPathExists('a')
 
1037
        self.assertPathDoesNotExist('b')
915
1038
        self.assertFileEqual('baz\n', 'a')
916
1039
 
917
1040
    def test_rename_missing_file(self):
970
1093
    """Test mac special functions that require directories."""
971
1094
 
972
1095
    def test_getcwd(self):
973
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1096
        self.requireFeature(features.UnicodeFilenameFeature)
974
1097
        os.mkdir(u'B\xe5gfors')
975
1098
        os.chdir(u'B\xe5gfors')
976
1099
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
977
1100
 
978
1101
    def test_getcwd_nonnorm(self):
979
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1102
        self.requireFeature(features.UnicodeFilenameFeature)
980
1103
        # Test that _mac_getcwd() will normalize this path
981
1104
        os.mkdir(u'Ba\u030agfors')
982
1105
        os.chdir(u'Ba\u030agfors')
1092
1215
        # are not using the filesystem's encoding
1093
1216
 
1094
1217
        # require a bytestring based filesystem
1095
 
        self.requireFeature(tests.ByteStringNamedFilesystem)
 
1218
        self.requireFeature(features.ByteStringNamedFilesystem)
1096
1219
 
1097
1220
        tree = [
1098
1221
            '.bzr',
1107
1230
 
1108
1231
        # rename the 1file to a latin-1 filename
1109
1232
        os.rename("./1file", "\xe8file")
 
1233
        if "\xe8file" not in os.listdir("."):
 
1234
            self.skip("Lack filesystem that preserves arbitrary bytes")
1110
1235
 
1111
1236
        self._save_platform_info()
1112
1237
        win32utils.winver = None # Avoid the win32 detection code
1190
1315
        self.requireFeature(UTF8DirReaderFeature)
1191
1316
        self._save_platform_info()
1192
1317
        win32utils.winver = None # Avoid the win32 detection code
1193
 
        osutils._fs_enc = 'UTF-8'
1194
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1318
        osutils._fs_enc = 'utf-8'
 
1319
        self.assertDirReaderIs(
 
1320
            UTF8DirReaderFeature.module.UTF8DirReader)
1195
1321
 
1196
1322
    def test_force_walkdirs_utf8_fs_ascii(self):
1197
1323
        self.requireFeature(UTF8DirReaderFeature)
1198
1324
        self._save_platform_info()
1199
1325
        win32utils.winver = None # Avoid the win32 detection code
1200
 
        osutils._fs_enc = 'US-ASCII'
1201
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1202
 
 
1203
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1204
 
        self.requireFeature(UTF8DirReaderFeature)
1205
 
        self._save_platform_info()
1206
 
        win32utils.winver = None # Avoid the win32 detection code
1207
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1208
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
1326
        osutils._fs_enc = 'ascii'
 
1327
        self.assertDirReaderIs(
 
1328
            UTF8DirReaderFeature.module.UTF8DirReader)
1209
1329
 
1210
1330
    def test_force_walkdirs_utf8_fs_latin1(self):
1211
1331
        self._save_platform_info()
1212
1332
        win32utils.winver = None # Avoid the win32 detection code
1213
 
        osutils._fs_enc = 'latin1'
 
1333
        osutils._fs_enc = 'iso-8859-1'
1214
1334
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1215
1335
 
1216
1336
    def test_force_walkdirs_utf8_nt(self):
1229
1349
 
1230
1350
    def test_unicode_walkdirs(self):
1231
1351
        """Walkdirs should always return unicode paths."""
1232
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1352
        self.requireFeature(features.UnicodeFilenameFeature)
1233
1353
        name0 = u'0file-\xb6'
1234
1354
        name1 = u'1dir-\u062c\u0648'
1235
1355
        name2 = u'2file-\u0633'
1272
1392
 
1273
1393
        The abspath portion might be in unicode or utf-8
1274
1394
        """
1275
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1395
        self.requireFeature(features.UnicodeFilenameFeature)
1276
1396
        name0 = u'0file-\xb6'
1277
1397
        name1 = u'1dir-\u062c\u0648'
1278
1398
        name2 = u'2file-\u0633'
1333
1453
 
1334
1454
        The abspath portion should be in unicode
1335
1455
        """
1336
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1456
        self.requireFeature(features.UnicodeFilenameFeature)
1337
1457
        # Use the unicode reader. TODO: split into driver-and-driven unit
1338
1458
        # tests.
1339
1459
        self._save_platform_info()
1380
1500
 
1381
1501
    def test__walkdirs_utf8_win32readdir(self):
1382
1502
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1383
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1503
        self.requireFeature(features.UnicodeFilenameFeature)
1384
1504
        from bzrlib._walkdirs_win32 import Win32ReadDir
1385
1505
        self._save_platform_info()
1386
1506
        osutils._selected_dir_reader = Win32ReadDir()
1437
1557
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1438
1558
        """make sure our Stat values are valid"""
1439
1559
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1440
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1560
        self.requireFeature(features.UnicodeFilenameFeature)
1441
1561
        from bzrlib._walkdirs_win32 import Win32ReadDir
1442
1562
        name0u = u'0file-\xb6'
1443
1563
        name0 = name0u.encode('utf8')
1461
1581
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1462
1582
        """make sure our Stat values are valid"""
1463
1583
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1464
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1584
        self.requireFeature(features.UnicodeFilenameFeature)
1465
1585
        from bzrlib._walkdirs_win32 import Win32ReadDir
1466
1586
        name0u = u'0dir-\u062c\u0648'
1467
1587
        name0 = name0u.encode('utf8')
1567
1687
        self.assertEqual(['c'], os.listdir('target/b'))
1568
1688
 
1569
1689
    def test_copy_tree_symlinks(self):
1570
 
        self.requireFeature(tests.SymlinkFeature)
 
1690
        self.requireFeature(features.SymlinkFeature)
1571
1691
        self.build_tree(['source/'])
1572
1692
        os.symlink('a/generic/path', 'source/lnk')
1573
1693
        osutils.copy_tree('source', 'target')
1598
1718
                          ('d', 'source/b', 'target/b'),
1599
1719
                          ('f', 'source/b/c', 'target/b/c'),
1600
1720
                         ], processed_files)
1601
 
        self.failIfExists('target')
 
1721
        self.assertPathDoesNotExist('target')
1602
1722
        if osutils.has_symlinks():
1603
1723
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1604
1724
 
1650
1770
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1651
1771
        self.assertEqual('foo', old)
1652
1772
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1653
 
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1773
        self.assertFalse('BZR_TEST_ENV_VAR' in os.environ)
1654
1774
 
1655
1775
 
1656
1776
class TestSizeShaFile(tests.TestCaseInTempDir):
1733
1853
 
1734
1854
class TestDirReader(tests.TestCaseInTempDir):
1735
1855
 
 
1856
    scenarios = dir_reader_scenarios()
 
1857
 
1736
1858
    # Set by load_tests
1737
1859
    _dir_reader_class = None
1738
1860
    _native_to_unicode = None
1739
1861
 
1740
1862
    def setUp(self):
1741
 
        tests.TestCaseInTempDir.setUp(self)
 
1863
        super(TestDirReader, self).setUp()
1742
1864
        self.overrideAttr(osutils,
1743
1865
                          '_selected_dir_reader', self._dir_reader_class())
1744
1866
 
1838
1960
        return filtered_dirblocks
1839
1961
 
1840
1962
    def test_walk_unicode_tree(self):
1841
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1963
        self.requireFeature(features.UnicodeFilenameFeature)
1842
1964
        tree, expected_dirblocks = self._get_unicode_tree()
1843
1965
        self.build_tree(tree)
1844
1966
        result = list(osutils._walkdirs_utf8('.'))
1845
1967
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1846
1968
 
1847
1969
    def test_symlink(self):
1848
 
        self.requireFeature(tests.SymlinkFeature)
1849
 
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1970
        self.requireFeature(features.SymlinkFeature)
 
1971
        self.requireFeature(features.UnicodeFilenameFeature)
1850
1972
        target = u'target\N{Euro Sign}'
1851
1973
        link_name = u'l\N{Euro Sign}nk'
1852
1974
        os.symlink(target, link_name)
1870
1992
    But prior python versions failed to properly encode the passed unicode
1871
1993
    string.
1872
1994
    """
1873
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
 
1995
    _test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1874
1996
 
1875
1997
    def setUp(self):
1876
1998
        super(tests.TestCaseInTempDir, self).setUp()
1879
2001
        os.symlink(self.target, self.link)
1880
2002
 
1881
2003
    def test_os_readlink_link_encoding(self):
1882
 
        if sys.version_info < (2, 6):
1883
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1884
 
        else:
1885
 
            self.assertEquals(self.target,  os.readlink(self.link))
 
2004
        self.assertEquals(self.target,  os.readlink(self.link))
1886
2005
 
1887
2006
    def test_os_readlink_link_decoding(self):
1888
2007
        self.assertEquals(self.target.encode(osutils._fs_enc),
1900
2019
        self.assertIsInstance(concurrency, int)
1901
2020
 
1902
2021
    def test_local_concurrency_environment_variable(self):
1903
 
        os.environ['BZR_CONCURRENCY'] = '2'
 
2022
        self.overrideEnv('BZR_CONCURRENCY', '2')
1904
2023
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1905
 
        os.environ['BZR_CONCURRENCY'] = '3'
 
2024
        self.overrideEnv('BZR_CONCURRENCY', '3')
1906
2025
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1907
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
 
2026
        self.overrideEnv('BZR_CONCURRENCY', 'foo')
1908
2027
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1909
2028
 
1910
2029
    def test_option_concurrency(self):
1911
 
        os.environ['BZR_CONCURRENCY'] = '1'
 
2030
        self.overrideEnv('BZR_CONCURRENCY', '1')
1912
2031
        self.run_bzr('rocks --concurrency 42')
1913
 
        # Command line overrides envrionment variable
 
2032
        # Command line overrides environment variable
1914
2033
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1915
2034
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1916
2035
 
1955
2074
class TestTerminalWidth(tests.TestCase):
1956
2075
 
1957
2076
    def setUp(self):
1958
 
        tests.TestCase.setUp(self)
 
2077
        super(TestTerminalWidth, self).setUp()
1959
2078
        self._orig_terminal_size_state = osutils._terminal_size_state
1960
2079
        self._orig_first_terminal_size = osutils._first_terminal_size
1961
2080
        self.addCleanup(self.restore_osutils_globals)
1986
2105
    def test_defaults_to_BZR_COLUMNS(self):
1987
2106
        # BZR_COLUMNS is set by the test framework
1988
2107
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1989
 
        os.environ['BZR_COLUMNS'] = '12'
 
2108
        self.overrideEnv('BZR_COLUMNS', '12')
1990
2109
        self.assertEqual(12, osutils.terminal_width())
1991
2110
 
 
2111
    def test_BZR_COLUMNS_0_no_limit(self):
 
2112
        self.overrideEnv('BZR_COLUMNS', '0')
 
2113
        self.assertEqual(None, osutils.terminal_width())
 
2114
 
1992
2115
    def test_falls_back_to_COLUMNS(self):
1993
 
        del os.environ['BZR_COLUMNS']
 
2116
        self.overrideEnv('BZR_COLUMNS', None)
1994
2117
        self.assertNotEqual('42', os.environ['COLUMNS'])
1995
2118
        self.set_fake_tty()
1996
 
        os.environ['COLUMNS'] = '42'
 
2119
        self.overrideEnv('COLUMNS', '42')
1997
2120
        self.assertEqual(42, osutils.terminal_width())
1998
2121
 
1999
2122
    def test_tty_default_without_columns(self):
2000
 
        del os.environ['BZR_COLUMNS']
2001
 
        del os.environ['COLUMNS']
 
2123
        self.overrideEnv('BZR_COLUMNS', None)
 
2124
        self.overrideEnv('COLUMNS', None)
2002
2125
 
2003
2126
        def terminal_size(w, h):
2004
2127
            return 42, 42
2011
2134
        self.assertEqual(42, osutils.terminal_width())
2012
2135
 
2013
2136
    def test_non_tty_default_without_columns(self):
2014
 
        del os.environ['BZR_COLUMNS']
2015
 
        del os.environ['COLUMNS']
 
2137
        self.overrideEnv('BZR_COLUMNS', None)
 
2138
        self.overrideEnv('COLUMNS', None)
2016
2139
        self.replace_stdout(None)
2017
2140
        self.assertEqual(None, osutils.terminal_width())
2018
2141
 
2028
2151
        else:
2029
2152
            self.overrideAttr(termios, 'TIOCGWINSZ')
2030
2153
            del termios.TIOCGWINSZ
2031
 
        del os.environ['BZR_COLUMNS']
2032
 
        del os.environ['COLUMNS']
 
2154
        self.overrideEnv('BZR_COLUMNS', None)
 
2155
        self.overrideEnv('COLUMNS', None)
2033
2156
        # Whatever the result is, if we don't raise an exception, it's ok.
2034
2157
        osutils.terminal_width()
2035
2158
 
 
2159
 
2036
2160
class TestCreationOps(tests.TestCaseInTempDir):
2037
2161
    _test_needs_features = [features.chown_feature]
2038
2162
 
2039
2163
    def setUp(self):
2040
 
        tests.TestCaseInTempDir.setUp(self)
 
2164
        super(TestCreationOps, self).setUp()
2041
2165
        self.overrideAttr(os, 'chown', self._dummy_chown)
2042
2166
 
2043
2167
        # params set by call to _dummy_chown
2068
2192
        self.assertEquals(self.uid, s.st_uid)
2069
2193
        self.assertEquals(self.gid, s.st_gid)
2070
2194
 
 
2195
 
 
2196
class TestPathFromEnviron(tests.TestCase):
 
2197
 
 
2198
    def test_is_unicode(self):
 
2199
        self.overrideEnv('BZR_TEST_PATH', './anywhere at all/')
 
2200
        path = osutils.path_from_environ('BZR_TEST_PATH')
 
2201
        self.assertIsInstance(path, unicode)
 
2202
        self.assertEqual(u'./anywhere at all/', path)
 
2203
 
 
2204
    def test_posix_path_env_ascii(self):
 
2205
        self.overrideEnv('BZR_TEST_PATH', '/tmp')
 
2206
        home = osutils._posix_path_from_environ('BZR_TEST_PATH')
 
2207
        self.assertIsInstance(home, unicode)
 
2208
        self.assertEqual(u'/tmp', home)
 
2209
 
 
2210
    def test_posix_path_env_unicode(self):
 
2211
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2212
        self.overrideEnv('BZR_TEST_PATH', '/home/\xa7test')
 
2213
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2214
        self.assertEqual(u'/home/\xa7test',
 
2215
            osutils._posix_path_from_environ('BZR_TEST_PATH'))
 
2216
        osutils._fs_enc = "iso8859-5"
 
2217
        self.assertEqual(u'/home/\u0407test',
 
2218
            osutils._posix_path_from_environ('BZR_TEST_PATH'))
 
2219
        osutils._fs_enc = "utf-8"
 
2220
        self.assertRaises(errors.BadFilenameEncoding,
 
2221
            osutils._posix_path_from_environ, 'BZR_TEST_PATH')
 
2222
 
 
2223
 
 
2224
class TestGetHomeDir(tests.TestCase):
 
2225
 
 
2226
    def test_is_unicode(self):
 
2227
        home = osutils._get_home_dir()
 
2228
        self.assertIsInstance(home, unicode)
 
2229
 
 
2230
    def test_posix_homeless(self):
 
2231
        self.overrideEnv('HOME', None)
 
2232
        home = osutils._get_home_dir()
 
2233
        self.assertIsInstance(home, unicode)
 
2234
 
 
2235
    def test_posix_home_ascii(self):
 
2236
        self.overrideEnv('HOME', '/home/test')
 
2237
        home = osutils._posix_get_home_dir()
 
2238
        self.assertIsInstance(home, unicode)
 
2239
        self.assertEqual(u'/home/test', home)
 
2240
 
 
2241
    def test_posix_home_unicode(self):
 
2242
        self.requireFeature(features.ByteStringNamedFilesystem)
 
2243
        self.overrideEnv('HOME', '/home/\xa7test')
 
2244
        self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
 
2245
        self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
 
2246
        osutils._fs_enc = "iso8859-5"
 
2247
        self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
 
2248
        osutils._fs_enc = "utf-8"
 
2249
        self.assertRaises(errors.BadFilenameEncoding,
 
2250
            osutils._posix_get_home_dir)
 
2251
 
 
2252
 
2071
2253
class TestGetuserUnicode(tests.TestCase):
2072
2254
 
 
2255
    def test_is_unicode(self):
 
2256
        user = osutils.getuser_unicode()
 
2257
        self.assertIsInstance(user, unicode)
 
2258
 
 
2259
    def envvar_to_override(self):
 
2260
        if sys.platform == "win32":
 
2261
            # Disable use of platform calls on windows so envvar is used
 
2262
            self.overrideAttr(win32utils, 'has_ctypes', False)
 
2263
            return 'USERNAME' # only variable used on windows
 
2264
        return 'LOGNAME' # first variable checked by getpass.getuser()
 
2265
 
2073
2266
    def test_ascii_user(self):
2074
 
        osutils.set_or_unset_env('LOGNAME', 'jrandom')
 
2267
        self.overrideEnv(self.envvar_to_override(), 'jrandom')
2075
2268
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
2076
2269
 
2077
2270
    def test_unicode_user(self):
2083
2276
                % (osutils.get_user_encoding(),))
2084
2277
        uni_username = u'jrandom' + uni_val
2085
2278
        encoded_username = uni_username.encode(ue)
2086
 
        osutils.set_or_unset_env('LOGNAME', encoded_username)
 
2279
        self.overrideEnv(self.envvar_to_override(), encoded_username)
2087
2280
        self.assertEqual(uni_username, osutils.getuser_unicode())
 
2281
 
 
2282
 
 
2283
class TestBackupNames(tests.TestCase):
 
2284
 
 
2285
    def setUp(self):
 
2286
        super(TestBackupNames, self).setUp()
 
2287
        self.backups = []
 
2288
 
 
2289
    def backup_exists(self, name):
 
2290
        return name in self.backups
 
2291
 
 
2292
    def available_backup_name(self, name):
 
2293
        backup_name = osutils.available_backup_name(name, self.backup_exists)
 
2294
        self.backups.append(backup_name)
 
2295
        return backup_name
 
2296
 
 
2297
    def assertBackupName(self, expected, name):
 
2298
        self.assertEqual(expected, self.available_backup_name(name))
 
2299
 
 
2300
    def test_empty(self):
 
2301
        self.assertBackupName('file.~1~', 'file')
 
2302
 
 
2303
    def test_existing(self):
 
2304
        self.available_backup_name('file')
 
2305
        self.available_backup_name('file')
 
2306
        self.assertBackupName('file.~3~', 'file')
 
2307
        # Empty slots are found, this is not a strict requirement and may be
 
2308
        # revisited if we test against all implementations.
 
2309
        self.backups.remove('file.~2~')
 
2310
        self.assertBackupName('file.~2~', 'file')
 
2311
 
 
2312
 
 
2313
class TestFindExecutableInPath(tests.TestCase):
 
2314
 
 
2315
    def test_windows(self):
 
2316
        if sys.platform != 'win32':
 
2317
            raise tests.TestSkipped('test requires win32')
 
2318
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
 
2319
        self.assertTrue(
 
2320
            osutils.find_executable_on_path('explorer.exe') is not None)
 
2321
        self.assertTrue(
 
2322
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
 
2323
        self.assertTrue(
 
2324
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2325
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
 
2326
        
 
2327
    def test_windows_app_path(self):
 
2328
        if sys.platform != 'win32':
 
2329
            raise tests.TestSkipped('test requires win32')
 
2330
        # Override PATH env var so that exe can only be found on App Path
 
2331
        self.overrideEnv('PATH', '')
 
2332
        # Internt Explorer is always registered in the App Path
 
2333
        self.assertTrue(osutils.find_executable_on_path('iexplore') is not None)
 
2334
 
 
2335
    def test_other(self):
 
2336
        if sys.platform == 'win32':
 
2337
            raise tests.TestSkipped('test requires non-win32')
 
2338
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
 
2339
        self.assertTrue(
 
2340
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
2341
 
 
2342
 
 
2343
class TestEnvironmentErrors(tests.TestCase):
 
2344
    """Test handling of environmental errors"""
 
2345
 
 
2346
    def test_is_oserror(self):
 
2347
        self.assertTrue(osutils.is_environment_error(
 
2348
            OSError(errno.EINVAL, "Invalid parameter")))
 
2349
 
 
2350
    def test_is_ioerror(self):
 
2351
        self.assertTrue(osutils.is_environment_error(
 
2352
            IOError(errno.EINVAL, "Invalid parameter")))
 
2353
 
 
2354
    def test_is_socket_error(self):
 
2355
        self.assertTrue(osutils.is_environment_error(
 
2356
            socket.error(errno.EINVAL, "Invalid parameter")))
 
2357
 
 
2358
    def test_is_select_error(self):
 
2359
        self.assertTrue(osutils.is_environment_error(
 
2360
            select.error(errno.EINVAL, "Invalid parameter")))
 
2361
 
 
2362
    def test_is_pywintypes_error(self):
 
2363
        self.requireFeature(features.pywintypes)
 
2364
        import pywintypes
 
2365
        self.assertTrue(osutils.is_environment_error(
 
2366
            pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))