1
# Copyright (C) 2005 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for the osutils wrapper."""
26
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
27
import bzrlib.osutils as osutils
28
from bzrlib.tests import (
36
class TestOSUtils(TestCaseInTempDir):
38
def test_fancy_rename(self):
39
# This should work everywhere
41
osutils.fancy_rename(a, b,
42
rename_func=os.rename,
43
unlink_func=os.unlink)
45
open('a', 'wb').write('something in a\n')
47
self.failIfExists('a')
48
self.failUnlessExists('b')
49
self.check_file_contents('b', 'something in a\n')
51
open('a', 'wb').write('new something in a\n')
54
self.check_file_contents('a', 'something in a\n')
56
def test_rename(self):
57
# Rename should be semi-atomic on all platforms
58
open('a', 'wb').write('something in a\n')
59
osutils.rename('a', 'b')
60
self.failIfExists('a')
61
self.failUnlessExists('b')
62
self.check_file_contents('b', 'something in a\n')
64
open('a', 'wb').write('new something in a\n')
65
osutils.rename('b', 'a')
67
self.check_file_contents('a', 'something in a\n')
69
# TODO: test fancy_rename using a MemoryTransport
71
def test_01_rand_chars_empty(self):
72
result = osutils.rand_chars(0)
73
self.assertEqual(result, '')
75
def test_02_rand_chars_100(self):
76
result = osutils.rand_chars(100)
77
self.assertEqual(len(result), 100)
78
self.assertEqual(type(result), str)
79
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
82
def test_rmtree(self):
83
# Check to remove tree with read-only files/dirs
85
f = file('dir/file', 'w')
88
# would like to also try making the directory readonly, but at the
89
# moment python shutil.rmtree doesn't handle that properly - it would
90
# need to chmod the directory before removing things inside it - deferred
91
# for now -- mbp 20060505
92
# osutils.make_readonly('dir')
93
osutils.make_readonly('dir/file')
97
self.failIfExists('dir/file')
98
self.failIfExists('dir')
100
def test_file_kind(self):
101
self.build_tree(['file', 'dir/'])
102
self.assertEquals('file', osutils.file_kind('file'))
103
self.assertEquals('directory', osutils.file_kind('dir/'))
104
if osutils.has_symlinks():
105
os.symlink('symlink', 'symlink')
106
self.assertEquals('symlink', osutils.file_kind('symlink'))
108
# TODO: jam 20060529 Test a block device
110
os.lstat('/dev/null')
112
if e.errno not in (errno.ENOENT,):
115
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
117
mkfifo = getattr(os, 'mkfifo', None)
121
self.assertEquals('fifo', osutils.file_kind('fifo'))
125
AF_UNIX = getattr(socket, 'AF_UNIX', None)
127
s = socket.socket(AF_UNIX)
130
self.assertEquals('socket', osutils.file_kind('socket'))
134
def test_get_umask(self):
135
if sys.platform == 'win32':
136
# umask always returns '0', no way to set it
137
self.assertEqual(0, osutils.get_umask())
140
orig_umask = osutils.get_umask()
143
self.assertEqual(0222, osutils.get_umask())
145
self.assertEqual(0022, osutils.get_umask())
147
self.assertEqual(0002, osutils.get_umask())
149
self.assertEqual(0027, osutils.get_umask())
153
def assertFormatedDelta(self, expected, seconds):
154
"""Assert osutils.format_delta formats as expected"""
155
actual = osutils.format_delta(seconds)
156
self.assertEqual(expected, actual)
158
def test_format_delta(self):
159
self.assertFormatedDelta('0 seconds ago', 0)
160
self.assertFormatedDelta('1 second ago', 1)
161
self.assertFormatedDelta('10 seconds ago', 10)
162
self.assertFormatedDelta('59 seconds ago', 59)
163
self.assertFormatedDelta('89 seconds ago', 89)
164
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
165
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
166
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
167
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
168
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
169
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
170
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
171
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
172
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
173
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
174
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
175
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
176
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
177
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
178
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
179
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
180
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
181
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
183
# We handle when time steps the wrong direction because computers
184
# don't have synchronized clocks.
185
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
186
self.assertFormatedDelta('1 second in the future', -1)
187
self.assertFormatedDelta('2 seconds in the future', -2)
190
class TestSafeUnicode(TestCase):
192
def test_from_ascii_string(self):
193
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
195
def test_from_unicode_string_ascii_contents(self):
196
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
198
def test_from_unicode_string_unicode_contents(self):
199
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
201
def test_from_utf8_string(self):
202
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
204
def test_bad_utf8_string(self):
205
self.assertRaises(BzrBadParameterNotUnicode,
206
osutils.safe_unicode,
210
class TestWin32Funcs(TestCase):
211
"""Test that the _win32 versions of os utilities return appropriate paths."""
213
def test_abspath(self):
214
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
215
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
217
def test_realpath(self):
218
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
219
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
221
def test_pathjoin(self):
222
self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
223
self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
224
self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
225
self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
226
self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
227
self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
229
def test_normpath(self):
230
self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
231
self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
233
def test_getcwd(self):
234
cwd = osutils._win32_getcwd()
235
os_cwd = os.getcwdu()
236
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
237
# win32 is inconsistent whether it returns lower or upper case
238
# and even if it was consistent the user might type the other
239
# so we force it to uppercase
240
# running python.exe under cmd.exe return capital C:\\
241
# running win32 python inside a cygwin shell returns lowercase
242
self.assertEqual(os_cwd[0].upper(), cwd[0])
244
def test_fixdrive(self):
245
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
246
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
247
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
250
class TestWin32FuncsDirs(TestCaseInTempDir):
251
"""Test win32 functions that create files."""
253
def test_getcwd(self):
254
# Make sure getcwd can handle unicode filenames
258
raise TestSkipped("Unable to create Unicode filename")
261
# TODO: jam 20060427 This will probably fail on Mac OSX because
262
# it will change the normalization of B\xe5gfors
263
# Consider using a different unicode character, or make
264
# osutils.getcwd() renormalize the path.
265
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
267
def test_mkdtemp(self):
268
tmpdir = osutils._win32_mkdtemp(dir='.')
269
self.assertFalse('\\' in tmpdir)
271
def test_rename(self):
279
osutils._win32_rename('b', 'a')
280
self.failUnlessExists('a')
281
self.failIfExists('b')
282
self.assertFileEqual('baz\n', 'a')
284
def test_rename_missing_file(self):
290
osutils._win32_rename('b', 'a')
291
except (IOError, OSError), e:
292
self.assertEqual(errno.ENOENT, e.errno)
293
self.assertFileEqual('foo\n', 'a')
295
def test_rename_missing_dir(self):
298
osutils._win32_rename('b', 'a')
299
except (IOError, OSError), e:
300
self.assertEqual(errno.ENOENT, e.errno)
302
def test_rename_current_dir(self):
305
# You can't rename the working directory
306
# doing rename non-existant . usually
307
# just raises ENOENT, since non-existant
310
osutils._win32_rename('b', '.')
311
except (IOError, OSError), e:
312
self.assertEqual(errno.ENOENT, e.errno)
315
class TestMacFuncsDirs(TestCaseInTempDir):
316
"""Test mac special functions that require directories."""
318
def test_getcwd(self):
319
# On Mac, this will actually create Ba\u030agfors
320
# but chdir will still work, because it accepts both paths
322
os.mkdir(u'B\xe5gfors')
324
raise TestSkipped("Unable to create Unicode filename")
326
os.chdir(u'B\xe5gfors')
327
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
329
def test_getcwd_nonnorm(self):
330
# Test that _mac_getcwd() will normalize this path
332
os.mkdir(u'Ba\u030agfors')
334
raise TestSkipped("Unable to create Unicode filename")
336
os.chdir(u'Ba\u030agfors')
337
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
339
class TestSplitLines(TestCase):
341
def test_split_unicode(self):
342
self.assertEqual([u'foo\n', u'bar\xae'],
343
osutils.split_lines(u'foo\nbar\xae'))
344
self.assertEqual([u'foo\n', u'bar\xae\n'],
345
osutils.split_lines(u'foo\nbar\xae\n'))
347
def test_split_with_carriage_returns(self):
348
self.assertEqual(['foo\rbar\n'],
349
osutils.split_lines('foo\rbar\n'))
352
class TestWalkDirs(TestCaseInTempDir):
354
def test_walkdirs(self):
363
self.build_tree(tree)
364
expected_dirblocks = [
366
[('0file', '0file', 'file'),
367
('1dir', '1dir', 'directory'),
368
('2file', '2file', 'file'),
372
[('1dir/0file', '0file', 'file'),
373
('1dir/1dir', '1dir', 'directory'),
376
(('1dir/1dir', './1dir/1dir'),
383
for dirdetail, dirblock in osutils.walkdirs('.'):
384
if len(dirblock) and dirblock[0][1] == '.bzr':
385
# this tests the filtering of selected paths
388
result.append((dirdetail, dirblock))
390
self.assertTrue(found_bzrdir)
391
self.assertEqual(expected_dirblocks,
392
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
393
# you can search a subdir only, with a supplied prefix.
395
for dirblock in osutils.walkdirs('./1dir', '1dir'):
396
result.append(dirblock)
397
self.assertEqual(expected_dirblocks[1:],
398
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
400
def assertPathCompare(self, path_less, path_greater):
401
"""check that path_less and path_greater compare correctly."""
402
self.assertEqual(0, osutils.compare_paths_prefix_order(
403
path_less, path_less))
404
self.assertEqual(0, osutils.compare_paths_prefix_order(
405
path_greater, path_greater))
406
self.assertEqual(-1, osutils.compare_paths_prefix_order(
407
path_less, path_greater))
408
self.assertEqual(1, osutils.compare_paths_prefix_order(
409
path_greater, path_less))
411
def test_compare_paths_prefix_order(self):
412
# root before all else
413
self.assertPathCompare("/", "/a")
415
self.assertPathCompare("/a", "/b")
416
self.assertPathCompare("/b", "/z")
417
# high dirs before lower.
418
self.assertPathCompare("/z", "/a/a")
419
# except if the deeper dir should be output first
420
self.assertPathCompare("/a/b/c", "/d/g")
421
# lexical betwen dirs of the same height
422
self.assertPathCompare("/a/z", "/z/z")
423
self.assertPathCompare("/a/c/z", "/a/d/e")
425
# this should also be consistent for no leading / paths
426
# root before all else
427
self.assertPathCompare("", "a")
429
self.assertPathCompare("a", "b")
430
self.assertPathCompare("b", "z")
431
# high dirs before lower.
432
self.assertPathCompare("z", "a/a")
433
# except if the deeper dir should be output first
434
self.assertPathCompare("a/b/c", "d/g")
435
# lexical betwen dirs of the same height
436
self.assertPathCompare("a/z", "z/z")
437
self.assertPathCompare("a/c/z", "a/d/e")
439
def test_path_prefix_sorting(self):
440
"""Doing a sort on path prefix should match our sample data."""
471
sorted(original_paths, key=osutils.path_prefix_key))
472
# using the comparison routine shoudl work too:
475
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
478
class TestCopyTree(TestCaseInTempDir):
480
def test_copy_basic_tree(self):
481
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
482
osutils.copy_tree('source', 'target')
483
self.assertEqual(['a', 'b'], os.listdir('target'))
484
self.assertEqual(['c'], os.listdir('target/b'))
486
def test_copy_tree_target_exists(self):
487
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
489
osutils.copy_tree('source', 'target')
490
self.assertEqual(['a', 'b'], os.listdir('target'))
491
self.assertEqual(['c'], os.listdir('target/b'))
493
def test_copy_tree_symlinks(self):
494
if not osutils.has_symlinks():
496
self.build_tree(['source/'])
497
os.symlink('a/generic/path', 'source/lnk')
498
osutils.copy_tree('source', 'target')
499
self.assertEqual(['lnk'], os.listdir('target'))
500
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
502
def test_copy_tree_handlers(self):
505
def file_handler(from_path, to_path):
506
processed_files.append(('f', from_path, to_path))
507
def dir_handler(from_path, to_path):
508
processed_files.append(('d', from_path, to_path))
509
def link_handler(from_path, to_path):
510
processed_links.append((from_path, to_path))
511
handlers = {'file':file_handler,
512
'directory':dir_handler,
513
'symlink':link_handler,
516
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
517
if osutils.has_symlinks():
518
os.symlink('a/generic/path', 'source/lnk')
519
osutils.copy_tree('source', 'target', handlers=handlers)
521
self.assertEqual([('d', 'source', 'target'),
522
('f', 'source/a', 'target/a'),
523
('d', 'source/b', 'target/b'),
524
('f', 'source/b/c', 'target/b/c'),
526
self.failIfExists('target')
527
if osutils.has_symlinks():
528
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
531
class TestTerminalEncoding(TestCase):
532
"""Test the auto-detection of proper terminal encoding."""
535
self._stdout = sys.stdout
536
self._stderr = sys.stderr
537
self._stdin = sys.stdin
538
self._user_encoding = bzrlib.user_encoding
540
self.addCleanup(self._reset)
542
sys.stdout = StringIOWrapper()
543
sys.stdout.encoding = 'stdout_encoding'
544
sys.stderr = StringIOWrapper()
545
sys.stderr.encoding = 'stderr_encoding'
546
sys.stdin = StringIOWrapper()
547
sys.stdin.encoding = 'stdin_encoding'
548
bzrlib.user_encoding = 'user_encoding'
551
sys.stdout = self._stdout
552
sys.stderr = self._stderr
553
sys.stdin = self._stdin
554
bzrlib.user_encoding = self._user_encoding
556
def test_get_terminal_encoding(self):
557
# first preference is stdout encoding
558
self.assertEqual('stdout_encoding', osutils.get_terminal_encoding())
560
sys.stdout.encoding = None
561
# if sys.stdout is None, fall back to sys.stdin
562
self.assertEqual('stdin_encoding', osutils.get_terminal_encoding())
564
sys.stdin.encoding = None
565
# and in the worst case, use bzrlib.user_encoding
566
self.assertEqual('user_encoding', osutils.get_terminal_encoding())
569
class TestSetUnsetEnv(TestCase):
570
"""Test updating the environment"""
573
super(TestSetUnsetEnv, self).setUp()
575
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
576
'Environment was not cleaned up properly.'
577
' Variable BZR_TEST_ENV_VAR should not exist.')
579
if 'BZR_TEST_ENV_VAR' in os.environ:
580
del os.environ['BZR_TEST_ENV_VAR']
582
self.addCleanup(cleanup)
585
"""Test that we can set an env variable"""
586
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
587
self.assertEqual(None, old)
588
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
590
def test_double_set(self):
591
"""Test that we get the old value out"""
592
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
593
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
594
self.assertEqual('foo', old)
595
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
597
def test_unicode(self):
598
"""Environment can only contain plain strings
600
So Unicode strings must be encoded.
602
# Try a few different characters, to see if we can get
603
# one that will be valid in the user_encoding
604
possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
605
for uni_val in possible_vals:
607
env_val = uni_val.encode(bzrlib.user_encoding)
608
except UnicodeEncodeError:
609
# Try a different character
614
raise TestSkipped('Cannot find a unicode character that works in'
615
' encoding %s' % (bzrlib.user_encoding,))
617
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
618
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
620
def test_unset(self):
621
"""Test that passing None will remove the env var"""
622
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
623
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
624
self.assertEqual('foo', old)
625
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
626
self.failIf('BZR_TEST_ENV_VAR' in os.environ)