~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Ross Lagerwall
  • Date: 2012-08-07 06:32:51 UTC
  • mto: (6437.63.5 2.5)
  • mto: This revision was merged to the branch mainline in revision 6558.
  • Revision ID: rosslagerwall@gmail.com-20120807063251-x9p03ghg2ws8oqjc
Add bzrlib/locale to .bzrignore

bzrlib/locale is generated with ./setup.py build_mo which is in turn called
by ./setup.py build

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2016 Canonical Ltd
 
1
# Copyright (C) 2005-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
23
23
import select
24
24
import socket
25
25
import sys
26
 
import tempfile
27
26
import time
28
27
 
29
28
from bzrlib import (
180
179
        # we can't use failUnlessExists on case-insensitive filesystem
181
180
        # so try to check shape of the tree
182
181
        shape = sorted(os.listdir('.'))
183
 
        self.assertEqual(['A', 'B'], shape)
184
 
 
185
 
    def test_rename_exception(self):
186
 
        try:
187
 
            osutils.rename('nonexistent_path', 'different_nonexistent_path')
188
 
        except OSError, e:
189
 
            self.assertEqual(e.old_filename, 'nonexistent_path')
190
 
            self.assertEqual(e.new_filename, 'different_nonexistent_path')
191
 
            self.assertTrue('nonexistent_path' in e.strerror)
192
 
            self.assertTrue('different_nonexistent_path' in e.strerror)
 
182
        self.assertEquals(['A', 'B'], shape)
193
183
 
194
184
 
195
185
class TestRandChars(tests.TestCase):
222
212
                         (['src'], SRC_FOO_C),
223
213
                         (['src'], 'src'),
224
214
                         ]:
225
 
            self.assertTrue(osutils.is_inside_any(dirs, fn))
 
215
            self.assert_(osutils.is_inside_any(dirs, fn))
226
216
        for dirs, fn in [(['src'], 'srccontrol'),
227
217
                         (['src'], 'srccontrol/foo')]:
228
218
            self.assertFalse(osutils.is_inside_any(dirs, fn))
234
224
                         (['src/bar.c', 'bla/foo.c'], 'src'),
235
225
                         (['src'], 'src'),
236
226
                         ]:
237
 
            self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
 
227
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
238
228
 
239
229
        for dirs, fn in [(['src'], 'srccontrol'),
240
230
                         (['srccontrol/foo.c'], 'src'),
298
288
 
299
289
    def test_file_kind(self):
300
290
        self.build_tree(['file', 'dir/'])
301
 
        self.assertEqual('file', osutils.file_kind('file'))
302
 
        self.assertEqual('directory', osutils.file_kind('dir/'))
 
291
        self.assertEquals('file', osutils.file_kind('file'))
 
292
        self.assertEquals('directory', osutils.file_kind('dir/'))
303
293
        if osutils.has_symlinks():
304
294
            os.symlink('symlink', 'symlink')
305
 
            self.assertEqual('symlink', osutils.file_kind('symlink'))
 
295
            self.assertEquals('symlink', osutils.file_kind('symlink'))
306
296
 
307
297
        # TODO: jam 20060529 Test a block device
308
298
        try:
311
301
            if e.errno not in (errno.ENOENT,):
312
302
                raise
313
303
        else:
314
 
            self.assertEqual('chardev', osutils.file_kind('/dev/null'))
 
304
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
315
305
 
316
306
        mkfifo = getattr(os, 'mkfifo', None)
317
307
        if mkfifo:
318
308
            mkfifo('fifo')
319
309
            try:
320
 
                self.assertEqual('fifo', osutils.file_kind('fifo'))
 
310
                self.assertEquals('fifo', osutils.file_kind('fifo'))
321
311
            finally:
322
312
                os.remove('fifo')
323
313
 
326
316
            s = socket.socket(AF_UNIX)
327
317
            s.bind('socket')
328
318
            try:
329
 
                self.assertEqual('socket', osutils.file_kind('socket'))
 
319
                self.assertEquals('socket', osutils.file_kind('socket'))
330
320
            finally:
331
321
                os.remove('socket')
332
322
 
437
427
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
438
428
 
439
429
 
440
 
class TestFdatasync(tests.TestCaseInTempDir):
441
 
 
442
 
    def do_fdatasync(self):
443
 
        f = tempfile.NamedTemporaryFile()
444
 
        osutils.fdatasync(f.fileno())
445
 
        f.close()
446
 
 
447
 
    @staticmethod
448
 
    def raise_eopnotsupp(*args, **kwargs):
449
 
        raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
450
 
 
451
 
    @staticmethod
452
 
    def raise_enotsup(*args, **kwargs):
453
 
        raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
454
 
 
455
 
    def test_fdatasync_handles_system_function(self):
456
 
        self.overrideAttr(os, "fdatasync")
457
 
        self.do_fdatasync()
458
 
 
459
 
    def test_fdatasync_handles_no_fdatasync_no_fsync(self):
460
 
        self.overrideAttr(os, "fdatasync")
461
 
        self.overrideAttr(os, "fsync")
462
 
        self.do_fdatasync()
463
 
 
464
 
    def test_fdatasync_handles_no_EOPNOTSUPP(self):
465
 
        self.overrideAttr(errno, "EOPNOTSUPP")
466
 
        self.do_fdatasync()
467
 
 
468
 
    def test_fdatasync_catches_ENOTSUP(self):
469
 
        enotsup = getattr(errno, "ENOTSUP", None)
470
 
        if enotsup is None:
471
 
            raise tests.TestNotApplicable("No ENOTSUP on this platform")
472
 
        self.overrideAttr(os, "fdatasync", self.raise_enotsup)
473
 
        self.do_fdatasync()
474
 
 
475
 
    def test_fdatasync_catches_EOPNOTSUPP(self):
476
 
        enotsup = getattr(errno, "EOPNOTSUPP", None)
477
 
        if enotsup is None:
478
 
            raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
479
 
        self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
480
 
        self.do_fdatasync()
481
 
 
482
 
 
483
430
class TestLinks(tests.TestCaseInTempDir):
484
431
 
485
432
    def test_dereference_path(self):
598
545
    """Test pumpfile method."""
599
546
 
600
547
    def setUp(self):
601
 
        super(TestPumpFile, self).setUp()
 
548
        tests.TestCase.setUp(self)
602
549
        # create a test datablock
603
550
        self.block_size = 512
604
551
        pattern = '0123456789ABCDEF'
872
819
        self.assertEqual(None, osutils.safe_file_id(None))
873
820
 
874
821
 
875
 
class TestSendAll(tests.TestCase):
876
 
 
877
 
    def test_send_with_disconnected_socket(self):
878
 
        class DisconnectedSocket(object):
879
 
            def __init__(self, err):
880
 
                self.err = err
881
 
            def send(self, content):
882
 
                raise self.err
883
 
            def close(self):
884
 
                pass
885
 
        # All of these should be treated as ConnectionReset
886
 
        errs = []
887
 
        for err_cls in (IOError, socket.error):
888
 
            for errnum in osutils._end_of_stream_errors:
889
 
                errs.append(err_cls(errnum))
890
 
        for err in errs:
891
 
            sock = DisconnectedSocket(err)
892
 
            self.assertRaises(errors.ConnectionReset,
893
 
                osutils.send_all, sock, 'some more content')
894
 
 
895
 
    def test_send_with_no_progress(self):
896
 
        # See https://bugs.launchpad.net/bzr/+bug/1047309
897
 
        # It seems that paramiko can get into a state where it doesn't error,
898
 
        # but it returns 0 bytes sent for requests over and over again.
899
 
        class NoSendingSocket(object):
900
 
            def __init__(self):
901
 
                self.call_count = 0
902
 
            def send(self, bytes):
903
 
                self.call_count += 1
904
 
                if self.call_count > 100:
905
 
                    # Prevent the test suite from hanging
906
 
                    raise RuntimeError('too many calls')
907
 
                return 0
908
 
        sock = NoSendingSocket()
909
 
        self.assertRaises(errors.ConnectionReset,
910
 
                          osutils.send_all, sock, 'content')
911
 
        self.assertEqual(1, sock.call_count)
912
 
 
913
 
 
914
822
class TestPosixFuncs(tests.TestCase):
915
823
    """Test that the posix version of normpath returns an appropriate path
916
824
       when used with 2 leading slashes."""
925
833
    """Test that _win32 versions of os utilities return appropriate paths."""
926
834
 
927
835
    def test_abspath(self):
928
 
        self.requireFeature(features.win32_feature)
929
836
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
930
837
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
931
838
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
944
851
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
945
852
        self.assertEqual('path/to/foo',
946
853
                         osutils._win32_pathjoin('path/to/', 'foo'))
947
 
 
948
 
    def test_pathjoin_late_bugfix(self):
949
 
        if sys.version_info < (2, 7, 6):
950
 
            expected = '/foo'
951
 
        else:
952
 
            expected = 'C:/foo'
953
 
        self.assertEqual(expected,
 
854
        self.assertEqual('/foo',
954
855
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
955
 
        self.assertEqual(expected,
 
856
        self.assertEqual('/foo',
956
857
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
957
858
 
958
859
    def test_normpath(self):
978
879
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
979
880
 
980
881
    def test_win98_abspath(self):
981
 
        self.requireFeature(features.win32_feature)
982
882
        # absolute path
983
883
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
984
884
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
1205
1105
        # (It would be ok if it happened earlier but at the moment it
1206
1106
        # doesn't.)
1207
1107
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1208
 
        self.assertEqual('./test-unreadable', e.filename)
1209
 
        self.assertEqual(errno.EACCES, e.errno)
 
1108
        self.assertEquals('./test-unreadable', e.filename)
 
1109
        self.assertEquals(errno.EACCES, e.errno)
1210
1110
        # Ensure the message contains the file name
1211
1111
        self.assertContainsRe(str(e), "\./test-unreadable")
1212
1112
 
1862
1762
    _native_to_unicode = None
1863
1763
 
1864
1764
    def setUp(self):
1865
 
        super(TestDirReader, self).setUp()
 
1765
        tests.TestCaseInTempDir.setUp(self)
1866
1766
        self.overrideAttr(osutils,
1867
1767
                          '_selected_dir_reader', self._dir_reader_class())
1868
1768
 
2003
1903
        os.symlink(self.target, self.link)
2004
1904
 
2005
1905
    def test_os_readlink_link_encoding(self):
2006
 
        self.assertEqual(self.target,  os.readlink(self.link))
 
1906
        self.assertEquals(self.target,  os.readlink(self.link))
2007
1907
 
2008
1908
    def test_os_readlink_link_decoding(self):
2009
 
        self.assertEqual(self.target.encode(osutils._fs_enc),
 
1909
        self.assertEquals(self.target.encode(osutils._fs_enc),
2010
1910
                          os.readlink(self.link.encode(osutils._fs_enc)))
2011
1911
 
2012
1912
 
2032
1932
        self.overrideEnv('BZR_CONCURRENCY', '1')
2033
1933
        self.run_bzr('rocks --concurrency 42')
2034
1934
        # Command line overrides environment variable
2035
 
        self.assertEqual('42', os.environ['BZR_CONCURRENCY'])
2036
 
        self.assertEqual(42, osutils.local_concurrency(use_cache=False))
 
1935
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
 
1936
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
2037
1937
 
2038
1938
 
2039
1939
class TestFailedToLoadExtension(tests.TestCase):
2052
1952
    def test_failure_to_load(self):
2053
1953
        self._try_loading()
2054
1954
        self.assertLength(1, osutils._extension_load_failures)
2055
 
        self.assertEqual(osutils._extension_load_failures[0],
 
1955
        self.assertEquals(osutils._extension_load_failures[0],
2056
1956
            "No module named _fictional_extension_py")
2057
1957
 
2058
1958
    def test_report_extension_load_failures_no_warning(self):
2076
1976
class TestTerminalWidth(tests.TestCase):
2077
1977
 
2078
1978
    def setUp(self):
2079
 
        super(TestTerminalWidth, self).setUp()
 
1979
        tests.TestCase.setUp(self)
2080
1980
        self._orig_terminal_size_state = osutils._terminal_size_state
2081
1981
        self._orig_first_terminal_size = osutils._first_terminal_size
2082
1982
        self.addCleanup(self.restore_osutils_globals)
2163
2063
    _test_needs_features = [features.chown_feature]
2164
2064
 
2165
2065
    def setUp(self):
2166
 
        super(TestCreationOps, self).setUp()
 
2066
        tests.TestCaseInTempDir.setUp(self)
2167
2067
        self.overrideAttr(os, 'chown', self._dummy_chown)
2168
2068
 
2169
2069
        # params set by call to _dummy_chown
2179
2079
        osutils.copy_ownership_from_path('test_file', ownsrc)
2180
2080
 
2181
2081
        s = os.stat(ownsrc)
2182
 
        self.assertEqual(self.path, 'test_file')
2183
 
        self.assertEqual(self.uid, s.st_uid)
2184
 
        self.assertEqual(self.gid, s.st_gid)
 
2082
        self.assertEquals(self.path, 'test_file')
 
2083
        self.assertEquals(self.uid, s.st_uid)
 
2084
        self.assertEquals(self.gid, s.st_gid)
2185
2085
 
2186
2086
    def test_copy_ownership_nonesrc(self):
2187
2087
        """copy_ownership_from_path test with src=None."""
2190
2090
        osutils.copy_ownership_from_path('test_file')
2191
2091
 
2192
2092
        s = os.stat('..')
2193
 
        self.assertEqual(self.path, 'test_file')
2194
 
        self.assertEqual(self.uid, s.st_uid)
2195
 
        self.assertEqual(self.gid, s.st_gid)
 
2093
        self.assertEquals(self.path, 'test_file')
 
2094
        self.assertEquals(self.uid, s.st_uid)
 
2095
        self.assertEquals(self.gid, s.st_gid)
2196
2096
 
2197
2097
 
2198
2098
class TestPathFromEnviron(tests.TestCase):