1
# Copyright (C) 2005-2016 Canonical Ltd
1
# Copyright (C) 2005-2011 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
180
179
# we can't use failUnlessExists on case-insensitive filesystem
181
180
# so try to check shape of the tree
182
181
shape = sorted(os.listdir('.'))
183
self.assertEqual(['A', 'B'], shape)
185
def test_rename_exception(self):
187
osutils.rename('nonexistent_path', 'different_nonexistent_path')
189
self.assertEqual(e.old_filename, 'nonexistent_path')
190
self.assertEqual(e.new_filename, 'different_nonexistent_path')
191
self.assertTrue('nonexistent_path' in e.strerror)
192
self.assertTrue('different_nonexistent_path' in e.strerror)
182
self.assertEquals(['A', 'B'], shape)
195
185
class TestRandChars(tests.TestCase):
222
212
(['src'], SRC_FOO_C),
223
213
(['src'], 'src'),
225
self.assertTrue(osutils.is_inside_any(dirs, fn))
215
self.assert_(osutils.is_inside_any(dirs, fn))
226
216
for dirs, fn in [(['src'], 'srccontrol'),
227
217
(['src'], 'srccontrol/foo')]:
228
218
self.assertFalse(osutils.is_inside_any(dirs, fn))
234
224
(['src/bar.c', 'bla/foo.c'], 'src'),
235
225
(['src'], 'src'),
237
self.assertTrue(osutils.is_inside_or_parent_of_any(dirs, fn))
227
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
239
229
for dirs, fn in [(['src'], 'srccontrol'),
240
230
(['srccontrol/foo.c'], 'src'),
299
289
def test_file_kind(self):
300
290
self.build_tree(['file', 'dir/'])
301
self.assertEqual('file', osutils.file_kind('file'))
302
self.assertEqual('directory', osutils.file_kind('dir/'))
291
self.assertEquals('file', osutils.file_kind('file'))
292
self.assertEquals('directory', osutils.file_kind('dir/'))
303
293
if osutils.has_symlinks():
304
294
os.symlink('symlink', 'symlink')
305
self.assertEqual('symlink', osutils.file_kind('symlink'))
295
self.assertEquals('symlink', osutils.file_kind('symlink'))
307
297
# TODO: jam 20060529 Test a block device
311
301
if e.errno not in (errno.ENOENT,):
314
self.assertEqual('chardev', osutils.file_kind('/dev/null'))
304
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
316
306
mkfifo = getattr(os, 'mkfifo', None)
320
self.assertEqual('fifo', osutils.file_kind('fifo'))
310
self.assertEquals('fifo', osutils.file_kind('fifo'))
322
312
os.remove('fifo')
437
427
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
440
class TestFdatasync(tests.TestCaseInTempDir):
442
def do_fdatasync(self):
443
f = tempfile.NamedTemporaryFile()
444
osutils.fdatasync(f.fileno())
448
def raise_eopnotsupp(*args, **kwargs):
449
raise IOError(errno.EOPNOTSUPP, os.strerror(errno.EOPNOTSUPP))
452
def raise_enotsup(*args, **kwargs):
453
raise IOError(errno.ENOTSUP, os.strerror(errno.ENOTSUP))
455
def test_fdatasync_handles_system_function(self):
456
self.overrideAttr(os, "fdatasync")
459
def test_fdatasync_handles_no_fdatasync_no_fsync(self):
460
self.overrideAttr(os, "fdatasync")
461
self.overrideAttr(os, "fsync")
464
def test_fdatasync_handles_no_EOPNOTSUPP(self):
465
self.overrideAttr(errno, "EOPNOTSUPP")
468
def test_fdatasync_catches_ENOTSUP(self):
469
enotsup = getattr(errno, "ENOTSUP", None)
471
raise tests.TestNotApplicable("No ENOTSUP on this platform")
472
self.overrideAttr(os, "fdatasync", self.raise_enotsup)
475
def test_fdatasync_catches_EOPNOTSUPP(self):
476
enotsup = getattr(errno, "EOPNOTSUPP", None)
478
raise tests.TestNotApplicable("No EOPNOTSUPP on this platform")
479
self.overrideAttr(os, "fdatasync", self.raise_eopnotsupp)
483
430
class TestLinks(tests.TestCaseInTempDir):
485
432
def test_dereference_path(self):
598
545
"""Test pumpfile method."""
601
super(TestPumpFile, self).setUp()
548
tests.TestCase.setUp(self)
602
549
# create a test datablock
603
550
self.block_size = 512
604
551
pattern = '0123456789ABCDEF'
872
819
self.assertEqual(None, osutils.safe_file_id(None))
875
class TestSendAll(tests.TestCase):
877
def test_send_with_disconnected_socket(self):
878
class DisconnectedSocket(object):
879
def __init__(self, err):
881
def send(self, content):
885
# All of these should be treated as ConnectionReset
887
for err_cls in (IOError, socket.error):
888
for errnum in osutils._end_of_stream_errors:
889
errs.append(err_cls(errnum))
891
sock = DisconnectedSocket(err)
892
self.assertRaises(errors.ConnectionReset,
893
osutils.send_all, sock, 'some more content')
895
def test_send_with_no_progress(self):
896
# See https://bugs.launchpad.net/bzr/+bug/1047309
897
# It seems that paramiko can get into a state where it doesn't error,
898
# but it returns 0 bytes sent for requests over and over again.
899
class NoSendingSocket(object):
902
def send(self, bytes):
904
if self.call_count > 100:
905
# Prevent the test suite from hanging
906
raise RuntimeError('too many calls')
908
sock = NoSendingSocket()
909
self.assertRaises(errors.ConnectionReset,
910
osutils.send_all, sock, 'content')
911
self.assertEqual(1, sock.call_count)
914
822
class TestPosixFuncs(tests.TestCase):
915
823
"""Test that the posix version of normpath returns an appropriate path
916
824
when used with 2 leading slashes."""
925
833
"""Test that _win32 versions of os utilities return appropriate paths."""
927
835
def test_abspath(self):
928
self.requireFeature(features.win32_feature)
929
836
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
930
837
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
931
838
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
944
851
osutils._win32_pathjoin('path/to', 'C:/foo'))
945
852
self.assertEqual('path/to/foo',
946
853
osutils._win32_pathjoin('path/to/', 'foo'))
948
def test_pathjoin_late_bugfix(self):
949
if sys.version_info < (2, 7, 6):
953
self.assertEqual(expected,
854
self.assertEqual('/foo',
954
855
osutils._win32_pathjoin('C:/path/to/', '/foo'))
955
self.assertEqual(expected,
856
self.assertEqual('/foo',
956
857
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
958
859
def test_normpath(self):
978
879
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
980
881
def test_win98_abspath(self):
981
self.requireFeature(features.win32_feature)
983
883
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
984
884
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
1205
1105
# (It would be ok if it happened earlier but at the moment it
1207
1107
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1208
self.assertEqual('./test-unreadable', e.filename)
1209
self.assertEqual(errno.EACCES, e.errno)
1108
self.assertEquals('./test-unreadable', e.filename)
1109
self.assertEquals(errno.EACCES, e.errno)
1210
1110
# Ensure the message contains the file name
1211
1111
self.assertContainsRe(str(e), "\./test-unreadable")
1862
1762
_native_to_unicode = None
1864
1764
def setUp(self):
1865
super(TestDirReader, self).setUp()
1765
tests.TestCaseInTempDir.setUp(self)
1866
1766
self.overrideAttr(osutils,
1867
1767
'_selected_dir_reader', self._dir_reader_class())
2003
1903
os.symlink(self.target, self.link)
2005
1905
def test_os_readlink_link_encoding(self):
2006
self.assertEqual(self.target, os.readlink(self.link))
1906
self.assertEquals(self.target, os.readlink(self.link))
2008
1908
def test_os_readlink_link_decoding(self):
2009
self.assertEqual(self.target.encode(osutils._fs_enc),
1909
self.assertEquals(self.target.encode(osutils._fs_enc),
2010
1910
os.readlink(self.link.encode(osutils._fs_enc)))
2032
1932
self.overrideEnv('BZR_CONCURRENCY', '1')
2033
1933
self.run_bzr('rocks --concurrency 42')
2034
1934
# Command line overrides environment variable
2035
self.assertEqual('42', os.environ['BZR_CONCURRENCY'])
2036
self.assertEqual(42, osutils.local_concurrency(use_cache=False))
1935
self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1936
self.assertEquals(42, osutils.local_concurrency(use_cache=False))
2039
1939
class TestFailedToLoadExtension(tests.TestCase):
2052
1952
def test_failure_to_load(self):
2053
1953
self._try_loading()
2054
1954
self.assertLength(1, osutils._extension_load_failures)
2055
self.assertEqual(osutils._extension_load_failures[0],
1955
self.assertEquals(osutils._extension_load_failures[0],
2056
1956
"No module named _fictional_extension_py")
2058
1958
def test_report_extension_load_failures_no_warning(self):
2076
1976
class TestTerminalWidth(tests.TestCase):
2078
1978
def setUp(self):
2079
super(TestTerminalWidth, self).setUp()
1979
tests.TestCase.setUp(self)
2080
1980
self._orig_terminal_size_state = osutils._terminal_size_state
2081
1981
self._orig_first_terminal_size = osutils._first_terminal_size
2082
1982
self.addCleanup(self.restore_osutils_globals)
2163
2063
_test_needs_features = [features.chown_feature]
2165
2065
def setUp(self):
2166
super(TestCreationOps, self).setUp()
2066
tests.TestCaseInTempDir.setUp(self)
2167
2067
self.overrideAttr(os, 'chown', self._dummy_chown)
2169
2069
# params set by call to _dummy_chown
2179
2079
osutils.copy_ownership_from_path('test_file', ownsrc)
2181
2081
s = os.stat(ownsrc)
2182
self.assertEqual(self.path, 'test_file')
2183
self.assertEqual(self.uid, s.st_uid)
2184
self.assertEqual(self.gid, s.st_gid)
2082
self.assertEquals(self.path, 'test_file')
2083
self.assertEquals(self.uid, s.st_uid)
2084
self.assertEquals(self.gid, s.st_gid)
2186
2086
def test_copy_ownership_nonesrc(self):
2187
2087
"""copy_ownership_from_path test with src=None."""
2190
2090
osutils.copy_ownership_from_path('test_file')
2192
2092
s = os.stat('..')
2193
self.assertEqual(self.path, 'test_file')
2194
self.assertEqual(self.uid, s.st_uid)
2195
self.assertEqual(self.gid, s.st_gid)
2093
self.assertEquals(self.path, 'test_file')
2094
self.assertEquals(self.uid, s.st_uid)
2095
self.assertEquals(self.gid, s.st_gid)
2198
2098
class TestPathFromEnviron(tests.TestCase):