1
# Copyright (C) 2005-2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
37
from bzrlib.tests import (
42
from bzrlib.tests.scenarios import load_tests_apply_scenarios
45
class _UTF8DirReaderFeature(features.Feature):
49
from bzrlib import _readdir_pyx
50
self.reader = _readdir_pyx.UTF8DirReader
55
def feature_name(self):
56
return 'bzrlib._readdir_pyx'
58
UTF8DirReaderFeature = features.ModuleAvailableFeature('bzrlib._readdir_pyx')
60
term_ios_feature = features.ModuleAvailableFeature('termios')
63
def _already_unicode(s):
67
def _utf8_to_unicode(s):
68
return s.decode('UTF-8')
71
def dir_reader_scenarios():
72
# For each dir reader we define:
74
# - native_to_unicode: a function converting the native_abspath as returned
75
# by DirReader.read_dir to its unicode representation
77
# UnicodeDirReader is the fallback, it should be tested on all platforms.
78
scenarios = [('unicode',
79
dict(_dir_reader_class=osutils.UnicodeDirReader,
80
_native_to_unicode=_already_unicode))]
81
# Some DirReaders are platform specific and even there they may not be
83
if UTF8DirReaderFeature.available():
84
from bzrlib import _readdir_pyx
85
scenarios.append(('utf8',
86
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
87
_native_to_unicode=_utf8_to_unicode)))
89
if test__walkdirs_win32.win32_readdir_feature.available():
91
from bzrlib import _walkdirs_win32
94
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
95
_native_to_unicode=_already_unicode)))
101
load_tests = load_tests_apply_scenarios
104
class TestContainsWhitespace(tests.TestCase):
106
def test_contains_whitespace(self):
107
self.assertTrue(osutils.contains_whitespace(u' '))
108
self.assertTrue(osutils.contains_whitespace(u'hello there'))
109
self.assertTrue(osutils.contains_whitespace(u'hellothere\n'))
110
self.assertTrue(osutils.contains_whitespace(u'hello\nthere'))
111
self.assertTrue(osutils.contains_whitespace(u'hello\rthere'))
112
self.assertTrue(osutils.contains_whitespace(u'hello\tthere'))
114
# \xa0 is "Non-breaking-space" which on some python locales thinks it
115
# is whitespace, but we do not.
116
self.assertFalse(osutils.contains_whitespace(u''))
117
self.assertFalse(osutils.contains_whitespace(u'hellothere'))
118
self.assertFalse(osutils.contains_whitespace(u'hello\xa0there'))
121
class TestRename(tests.TestCaseInTempDir):
123
def create_file(self, filename, content):
124
f = open(filename, 'wb')
130
def _fancy_rename(self, a, b):
131
osutils.fancy_rename(a, b, rename_func=os.rename,
132
unlink_func=os.unlink)
134
def test_fancy_rename(self):
135
# This should work everywhere
136
self.create_file('a', 'something in a\n')
137
self._fancy_rename('a', 'b')
138
self.assertPathDoesNotExist('a')
139
self.assertPathExists('b')
140
self.check_file_contents('b', 'something in a\n')
142
self.create_file('a', 'new something in a\n')
143
self._fancy_rename('b', 'a')
145
self.check_file_contents('a', 'something in a\n')
147
def test_fancy_rename_fails_source_missing(self):
148
# An exception should be raised, and the target should be left in place
149
self.create_file('target', 'data in target\n')
150
self.assertRaises((IOError, OSError), self._fancy_rename,
151
'missingsource', 'target')
152
self.assertPathExists('target')
153
self.check_file_contents('target', 'data in target\n')
155
def test_fancy_rename_fails_if_source_and_target_missing(self):
156
self.assertRaises((IOError, OSError), self._fancy_rename,
157
'missingsource', 'missingtarget')
159
def test_rename(self):
160
# Rename should be semi-atomic on all platforms
161
self.create_file('a', 'something in a\n')
162
osutils.rename('a', 'b')
163
self.assertPathDoesNotExist('a')
164
self.assertPathExists('b')
165
self.check_file_contents('b', 'something in a\n')
167
self.create_file('a', 'new something in a\n')
168
osutils.rename('b', 'a')
170
self.check_file_contents('a', 'something in a\n')
172
# TODO: test fancy_rename using a MemoryTransport
174
def test_rename_change_case(self):
175
# on Windows we should be able to change filename case by rename
176
self.build_tree(['a', 'b/'])
177
osutils.rename('a', 'A')
178
osutils.rename('b', 'B')
179
# we can't use failUnlessExists on case-insensitive filesystem
180
# so try to check shape of the tree
181
shape = sorted(os.listdir('.'))
182
self.assertEquals(['A', 'B'], shape)
185
class TestRandChars(tests.TestCase):
187
def test_01_rand_chars_empty(self):
188
result = osutils.rand_chars(0)
189
self.assertEqual(result, '')
191
def test_02_rand_chars_100(self):
192
result = osutils.rand_chars(100)
193
self.assertEqual(len(result), 100)
194
self.assertEqual(type(result), str)
195
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
198
class TestIsInside(tests.TestCase):
200
def test_is_inside(self):
201
is_inside = osutils.is_inside
202
self.assertTrue(is_inside('src', 'src/foo.c'))
203
self.assertFalse(is_inside('src', 'srccontrol'))
204
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
205
self.assertTrue(is_inside('foo.c', 'foo.c'))
206
self.assertFalse(is_inside('foo.c', ''))
207
self.assertTrue(is_inside('', 'foo.c'))
209
def test_is_inside_any(self):
210
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
211
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
212
(['src'], SRC_FOO_C),
215
self.assert_(osutils.is_inside_any(dirs, fn))
216
for dirs, fn in [(['src'], 'srccontrol'),
217
(['src'], 'srccontrol/foo')]:
218
self.assertFalse(osutils.is_inside_any(dirs, fn))
220
def test_is_inside_or_parent_of_any(self):
221
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
222
(['src'], 'src/foo.c'),
223
(['src/bar.c'], 'src'),
224
(['src/bar.c', 'bla/foo.c'], 'src'),
227
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
229
for dirs, fn in [(['src'], 'srccontrol'),
230
(['srccontrol/foo.c'], 'src'),
231
(['src'], 'srccontrol/foo')]:
232
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
235
class TestLstat(tests.TestCaseInTempDir):
237
def test_lstat_matches_fstat(self):
238
# On Windows, lstat and fstat don't always agree, primarily in the
239
# 'st_ino' and 'st_dev' fields. So we force them to be '0' in our
240
# custom implementation.
241
if sys.platform == 'win32':
242
# We only have special lstat/fstat if we have the extension.
243
# Without it, we may end up re-reading content when we don't have
244
# to, but otherwise it doesn't effect correctness.
245
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
246
f = open('test-file.txt', 'wb')
247
self.addCleanup(f.close)
248
f.write('some content\n')
250
self.assertEqualStat(osutils.fstat(f.fileno()),
251
osutils.lstat('test-file.txt'))
254
class TestRmTree(tests.TestCaseInTempDir):
256
def test_rmtree(self):
257
# Check to remove tree with read-only files/dirs
259
f = file('dir/file', 'w')
262
# would like to also try making the directory readonly, but at the
263
# moment python shutil.rmtree doesn't handle that properly - it would
264
# need to chmod the directory before removing things inside it - deferred
265
# for now -- mbp 20060505
266
# osutils.make_readonly('dir')
267
osutils.make_readonly('dir/file')
269
osutils.rmtree('dir')
271
self.assertPathDoesNotExist('dir/file')
272
self.assertPathDoesNotExist('dir')
275
class TestDeleteAny(tests.TestCaseInTempDir):
277
def test_delete_any_readonly(self):
278
# from <https://bugs.launchpad.net/bzr/+bug/218206>
279
self.build_tree(['d/', 'f'])
280
osutils.make_readonly('d')
281
osutils.make_readonly('f')
283
osutils.delete_any('f')
284
osutils.delete_any('d')
287
class TestKind(tests.TestCaseInTempDir):
289
def test_file_kind(self):
290
self.build_tree(['file', 'dir/'])
291
self.assertEquals('file', osutils.file_kind('file'))
292
self.assertEquals('directory', osutils.file_kind('dir/'))
293
if osutils.has_symlinks():
294
os.symlink('symlink', 'symlink')
295
self.assertEquals('symlink', osutils.file_kind('symlink'))
297
# TODO: jam 20060529 Test a block device
299
os.lstat('/dev/null')
301
if e.errno not in (errno.ENOENT,):
304
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
306
mkfifo = getattr(os, 'mkfifo', None)
310
self.assertEquals('fifo', osutils.file_kind('fifo'))
314
AF_UNIX = getattr(socket, 'AF_UNIX', None)
316
s = socket.socket(AF_UNIX)
319
self.assertEquals('socket', osutils.file_kind('socket'))
323
def test_kind_marker(self):
324
self.assertEqual("", osutils.kind_marker("file"))
325
self.assertEqual("/", osutils.kind_marker('directory'))
326
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
327
self.assertEqual("@", osutils.kind_marker("symlink"))
328
self.assertEqual("+", osutils.kind_marker("tree-reference"))
329
self.assertEqual("", osutils.kind_marker("fifo"))
330
self.assertEqual("", osutils.kind_marker("socket"))
331
self.assertEqual("", osutils.kind_marker("unknown"))
334
class TestUmask(tests.TestCaseInTempDir):
336
def test_get_umask(self):
337
if sys.platform == 'win32':
338
# umask always returns '0', no way to set it
339
self.assertEqual(0, osutils.get_umask())
342
orig_umask = osutils.get_umask()
343
self.addCleanup(os.umask, orig_umask)
345
self.assertEqual(0222, osutils.get_umask())
347
self.assertEqual(0022, osutils.get_umask())
349
self.assertEqual(0002, osutils.get_umask())
351
self.assertEqual(0027, osutils.get_umask())
354
class TestDateTime(tests.TestCase):
356
def assertFormatedDelta(self, expected, seconds):
357
"""Assert osutils.format_delta formats as expected"""
358
actual = osutils.format_delta(seconds)
359
self.assertEqual(expected, actual)
361
def test_format_delta(self):
362
self.assertFormatedDelta('0 seconds ago', 0)
363
self.assertFormatedDelta('1 second ago', 1)
364
self.assertFormatedDelta('10 seconds ago', 10)
365
self.assertFormatedDelta('59 seconds ago', 59)
366
self.assertFormatedDelta('89 seconds ago', 89)
367
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
368
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
369
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
370
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
371
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
372
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
373
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
374
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
375
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
376
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
377
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
378
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
379
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
380
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
381
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
382
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
383
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
384
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
386
# We handle when time steps the wrong direction because computers
387
# don't have synchronized clocks.
388
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
389
self.assertFormatedDelta('1 second in the future', -1)
390
self.assertFormatedDelta('2 seconds in the future', -2)
392
def test_format_date(self):
393
self.assertRaises(errors.UnsupportedTimezoneFormat,
394
osutils.format_date, 0, timezone='foo')
395
self.assertIsInstance(osutils.format_date(0), str)
396
self.assertIsInstance(osutils.format_local_date(0), unicode)
397
# Testing for the actual value of the local weekday without
398
# duplicating the code from format_date is difficult.
399
# Instead blackbox.test_locale should check for localized
400
# dates once they do occur in output strings.
402
def test_format_date_with_offset_in_original_timezone(self):
403
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
404
osutils.format_date_with_offset_in_original_timezone(0))
405
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
406
osutils.format_date_with_offset_in_original_timezone(100000))
407
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
408
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
410
def test_local_time_offset(self):
411
"""Test that local_time_offset() returns a sane value."""
412
offset = osutils.local_time_offset()
413
self.assertTrue(isinstance(offset, int))
414
# Test that the offset is no more than a eighteen hours in
416
# Time zone handling is system specific, so it is difficult to
417
# do more specific tests, but a value outside of this range is
419
eighteen_hours = 18 * 3600
420
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
422
def test_local_time_offset_with_timestamp(self):
423
"""Test that local_time_offset() works with a timestamp."""
424
offset = osutils.local_time_offset(1000000000.1234567)
425
self.assertTrue(isinstance(offset, int))
426
eighteen_hours = 18 * 3600
427
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
430
class TestLinks(tests.TestCaseInTempDir):
432
def test_dereference_path(self):
433
self.requireFeature(features.SymlinkFeature)
434
cwd = osutils.realpath('.')
436
bar_path = osutils.pathjoin(cwd, 'bar')
437
# Using './' to avoid bug #1213894 (first path component not
438
# dereferenced) in Python 2.4.1 and earlier
439
self.assertEqual(bar_path, osutils.realpath('./bar'))
440
os.symlink('bar', 'foo')
441
self.assertEqual(bar_path, osutils.realpath('./foo'))
443
# Does not dereference terminal symlinks
444
foo_path = osutils.pathjoin(cwd, 'foo')
445
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
447
# Dereferences parent symlinks
449
baz_path = osutils.pathjoin(bar_path, 'baz')
450
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
452
# Dereferences parent symlinks that are the first path element
453
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
455
# Dereferences parent symlinks in absolute paths
456
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
457
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
459
def test_changing_access(self):
460
f = file('file', 'w')
464
# Make a file readonly
465
osutils.make_readonly('file')
466
mode = os.lstat('file').st_mode
467
self.assertEqual(mode, mode & 0777555)
469
# Make a file writable
470
osutils.make_writable('file')
471
mode = os.lstat('file').st_mode
472
self.assertEqual(mode, mode | 0200)
474
if osutils.has_symlinks():
475
# should not error when handed a symlink
476
os.symlink('nonexistent', 'dangling')
477
osutils.make_readonly('dangling')
478
osutils.make_writable('dangling')
480
def test_host_os_dereferences_symlinks(self):
481
osutils.host_os_dereferences_symlinks()
484
class TestCanonicalRelPath(tests.TestCaseInTempDir):
486
_test_needs_features = [features.CaseInsCasePresFilenameFeature]
488
def test_canonical_relpath_simple(self):
489
f = file('MixedCaseName', 'w')
491
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
492
self.assertEqual('work/MixedCaseName', actual)
494
def test_canonical_relpath_missing_tail(self):
495
os.mkdir('MixedCaseParent')
496
actual = osutils.canonical_relpath(self.test_base_dir,
497
'mixedcaseparent/nochild')
498
self.assertEqual('work/MixedCaseParent/nochild', actual)
501
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
503
def assertRelpath(self, expected, base, path):
504
actual = osutils._cicp_canonical_relpath(base, path)
505
self.assertEqual(expected, actual)
507
def test_simple(self):
508
self.build_tree(['MixedCaseName'])
509
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
510
self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
512
def test_subdir_missing_tail(self):
513
self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
514
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
515
self.assertRelpath('MixedCaseParent/a_child', base,
516
'MixedCaseParent/a_child')
517
self.assertRelpath('MixedCaseParent/a_child', base,
518
'MixedCaseParent/A_Child')
519
self.assertRelpath('MixedCaseParent/not_child', base,
520
'MixedCaseParent/not_child')
522
def test_at_root_slash(self):
523
# We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
525
if osutils.MIN_ABS_PATHLENGTH > 1:
526
raise tests.TestSkipped('relpath requires %d chars'
527
% osutils.MIN_ABS_PATHLENGTH)
528
self.assertRelpath('foo', '/', '/foo')
530
def test_at_root_drive(self):
531
if sys.platform != 'win32':
532
raise tests.TestNotApplicable('we can only test drive-letter relative'
533
' paths on Windows where we have drive'
536
# The specific issue is that when at the root of a drive, 'abspath'
537
# returns "C:/" or just "/". However, the code assumes that abspath
538
# always returns something like "C:/foo" or "/foo" (no trailing slash).
539
self.assertRelpath('foo', 'C:/', 'C:/foo')
540
self.assertRelpath('foo', 'X:/', 'X:/foo')
541
self.assertRelpath('foo', 'X:/', 'X://foo')
544
class TestPumpFile(tests.TestCase):
545
"""Test pumpfile method."""
548
tests.TestCase.setUp(self)
549
# create a test datablock
550
self.block_size = 512
551
pattern = '0123456789ABCDEF'
552
self.test_data = pattern * (3 * self.block_size / len(pattern))
553
self.test_data_len = len(self.test_data)
555
def test_bracket_block_size(self):
556
"""Read data in blocks with the requested read size bracketing the
558
# make sure test data is larger than max read size
559
self.assertTrue(self.test_data_len > self.block_size)
561
from_file = file_utils.FakeReadFile(self.test_data)
564
# read (max / 2) bytes and verify read size wasn't affected
565
num_bytes_to_read = self.block_size / 2
566
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
567
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
568
self.assertEqual(from_file.get_read_count(), 1)
570
# read (max) bytes and verify read size wasn't affected
571
num_bytes_to_read = self.block_size
572
from_file.reset_read_count()
573
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
574
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
575
self.assertEqual(from_file.get_read_count(), 1)
577
# read (max + 1) bytes and verify read size was limited
578
num_bytes_to_read = self.block_size + 1
579
from_file.reset_read_count()
580
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
581
self.assertEqual(from_file.get_max_read_size(), self.block_size)
582
self.assertEqual(from_file.get_read_count(), 2)
584
# finish reading the rest of the data
585
num_bytes_to_read = self.test_data_len - to_file.tell()
586
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
588
# report error if the data wasn't equal (we only report the size due
589
# to the length of the data)
590
response_data = to_file.getvalue()
591
if response_data != self.test_data:
592
message = "Data not equal. Expected %d bytes, received %d."
593
self.fail(message % (len(response_data), self.test_data_len))
595
def test_specified_size(self):
596
"""Request a transfer larger than the maximum block size and verify
597
that the maximum read doesn't exceed the block_size."""
598
# make sure test data is larger than max read size
599
self.assertTrue(self.test_data_len > self.block_size)
601
# retrieve data in blocks
602
from_file = file_utils.FakeReadFile(self.test_data)
604
osutils.pumpfile(from_file, to_file, self.test_data_len,
607
# verify read size was equal to the maximum read size
608
self.assertTrue(from_file.get_max_read_size() > 0)
609
self.assertEqual(from_file.get_max_read_size(), self.block_size)
610
self.assertEqual(from_file.get_read_count(), 3)
612
# report error if the data wasn't equal (we only report the size due
613
# to the length of the data)
614
response_data = to_file.getvalue()
615
if response_data != self.test_data:
616
message = "Data not equal. Expected %d bytes, received %d."
617
self.fail(message % (len(response_data), self.test_data_len))
619
def test_to_eof(self):
620
"""Read to end-of-file and verify that the reads are not larger than
621
the maximum read size."""
622
# make sure test data is larger than max read size
623
self.assertTrue(self.test_data_len > self.block_size)
625
# retrieve data to EOF
626
from_file = file_utils.FakeReadFile(self.test_data)
628
osutils.pumpfile(from_file, to_file, -1, self.block_size)
630
# verify read size was equal to the maximum read size
631
self.assertEqual(from_file.get_max_read_size(), self.block_size)
632
self.assertEqual(from_file.get_read_count(), 4)
634
# report error if the data wasn't equal (we only report the size due
635
# to the length of the data)
636
response_data = to_file.getvalue()
637
if response_data != self.test_data:
638
message = "Data not equal. Expected %d bytes, received %d."
639
self.fail(message % (len(response_data), self.test_data_len))
641
def test_defaults(self):
642
"""Verifies that the default arguments will read to EOF -- this
643
test verifies that any existing usages of pumpfile will not be broken
644
with this new version."""
645
# retrieve data using default (old) pumpfile method
646
from_file = file_utils.FakeReadFile(self.test_data)
648
osutils.pumpfile(from_file, to_file)
650
# report error if the data wasn't equal (we only report the size due
651
# to the length of the data)
652
response_data = to_file.getvalue()
653
if response_data != self.test_data:
654
message = "Data not equal. Expected %d bytes, received %d."
655
self.fail(message % (len(response_data), self.test_data_len))
657
def test_report_activity(self):
659
def log_activity(length, direction):
660
activity.append((length, direction))
661
from_file = StringIO(self.test_data)
663
osutils.pumpfile(from_file, to_file, buff_size=500,
664
report_activity=log_activity, direction='read')
665
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
666
(36, 'read')], activity)
668
from_file = StringIO(self.test_data)
671
osutils.pumpfile(from_file, to_file, buff_size=500,
672
report_activity=log_activity, direction='write')
673
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
674
(36, 'write')], activity)
676
# And with a limited amount of data
677
from_file = StringIO(self.test_data)
680
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
681
report_activity=log_activity, direction='read')
682
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
686
class TestPumpStringFile(tests.TestCase):
688
def test_empty(self):
690
osutils.pump_string_file("", output)
691
self.assertEqual("", output.getvalue())
693
def test_more_than_segment_size(self):
695
osutils.pump_string_file("123456789", output, 2)
696
self.assertEqual("123456789", output.getvalue())
698
def test_segment_size(self):
700
osutils.pump_string_file("12", output, 2)
701
self.assertEqual("12", output.getvalue())
703
def test_segment_size_multiple(self):
705
osutils.pump_string_file("1234", output, 2)
706
self.assertEqual("1234", output.getvalue())
709
class TestRelpath(tests.TestCase):
711
def test_simple_relpath(self):
712
cwd = osutils.getcwd()
713
subdir = cwd + '/subdir'
714
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
716
def test_deep_relpath(self):
717
cwd = osutils.getcwd()
718
subdir = cwd + '/sub/subsubdir'
719
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
721
def test_not_relative(self):
722
self.assertRaises(errors.PathNotChild,
723
osutils.relpath, 'C:/path', 'H:/path')
724
self.assertRaises(errors.PathNotChild,
725
osutils.relpath, 'C:/', 'H:/path')
728
class TestSafeUnicode(tests.TestCase):
730
def test_from_ascii_string(self):
731
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
733
def test_from_unicode_string_ascii_contents(self):
734
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
736
def test_from_unicode_string_unicode_contents(self):
737
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
739
def test_from_utf8_string(self):
740
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
742
def test_bad_utf8_string(self):
743
self.assertRaises(errors.BzrBadParameterNotUnicode,
744
osutils.safe_unicode,
748
class TestSafeUtf8(tests.TestCase):
750
def test_from_ascii_string(self):
752
self.assertEqual('foobar', osutils.safe_utf8(f))
754
def test_from_unicode_string_ascii_contents(self):
755
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
757
def test_from_unicode_string_unicode_contents(self):
758
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
760
def test_from_utf8_string(self):
761
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
763
def test_bad_utf8_string(self):
764
self.assertRaises(errors.BzrBadParameterNotUnicode,
765
osutils.safe_utf8, '\xbb\xbb')
768
class TestSafeRevisionId(tests.TestCase):
770
def test_from_ascii_string(self):
771
# this shouldn't give a warning because it's getting an ascii string
772
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
774
def test_from_unicode_string_ascii_contents(self):
775
self.assertEqual('bargam',
776
osutils.safe_revision_id(u'bargam', warn=False))
778
def test_from_unicode_deprecated(self):
779
self.assertEqual('bargam',
780
self.callDeprecated([osutils._revision_id_warning],
781
osutils.safe_revision_id, u'bargam'))
783
def test_from_unicode_string_unicode_contents(self):
784
self.assertEqual('bargam\xc2\xae',
785
osutils.safe_revision_id(u'bargam\xae', warn=False))
787
def test_from_utf8_string(self):
788
self.assertEqual('foo\xc2\xae',
789
osutils.safe_revision_id('foo\xc2\xae'))
792
"""Currently, None is a valid revision_id"""
793
self.assertEqual(None, osutils.safe_revision_id(None))
796
class TestSafeFileId(tests.TestCase):
798
def test_from_ascii_string(self):
799
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
801
def test_from_unicode_string_ascii_contents(self):
802
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
804
def test_from_unicode_deprecated(self):
805
self.assertEqual('bargam',
806
self.callDeprecated([osutils._file_id_warning],
807
osutils.safe_file_id, u'bargam'))
809
def test_from_unicode_string_unicode_contents(self):
810
self.assertEqual('bargam\xc2\xae',
811
osutils.safe_file_id(u'bargam\xae', warn=False))
813
def test_from_utf8_string(self):
814
self.assertEqual('foo\xc2\xae',
815
osutils.safe_file_id('foo\xc2\xae'))
818
"""Currently, None is a valid revision_id"""
819
self.assertEqual(None, osutils.safe_file_id(None))
822
class TestPosixFuncs(tests.TestCase):
823
"""Test that the posix version of normpath returns an appropriate path
824
when used with 2 leading slashes."""
826
def test_normpath(self):
827
self.assertEqual('/etc/shadow', osutils._posix_normpath('/etc/shadow'))
828
self.assertEqual('/etc/shadow', osutils._posix_normpath('//etc/shadow'))
829
self.assertEqual('/etc/shadow', osutils._posix_normpath('///etc/shadow'))
832
class TestWin32Funcs(tests.TestCase):
833
"""Test that _win32 versions of os utilities return appropriate paths."""
835
def test_abspath(self):
836
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
837
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
838
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
839
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
841
def test_realpath(self):
842
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
843
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
845
def test_pathjoin(self):
846
self.assertEqual('path/to/foo',
847
osutils._win32_pathjoin('path', 'to', 'foo'))
848
self.assertEqual('C:/foo',
849
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
850
self.assertEqual('C:/foo',
851
osutils._win32_pathjoin('path/to', 'C:/foo'))
852
self.assertEqual('path/to/foo',
853
osutils._win32_pathjoin('path/to/', 'foo'))
854
self.assertEqual('/foo',
855
osutils._win32_pathjoin('C:/path/to/', '/foo'))
856
self.assertEqual('/foo',
857
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
859
def test_normpath(self):
860
self.assertEqual('path/to/foo',
861
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
862
self.assertEqual('path/to/foo',
863
osutils._win32_normpath('path//from/../to/./foo'))
865
def test_getcwd(self):
866
cwd = osutils._win32_getcwd()
867
os_cwd = os.getcwdu()
868
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
869
# win32 is inconsistent whether it returns lower or upper case
870
# and even if it was consistent the user might type the other
871
# so we force it to uppercase
872
# running python.exe under cmd.exe return capital C:\\
873
# running win32 python inside a cygwin shell returns lowercase
874
self.assertEqual(os_cwd[0].upper(), cwd[0])
876
def test_fixdrive(self):
877
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
878
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
879
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
881
def test_win98_abspath(self):
883
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
884
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
886
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
887
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
889
cwd = osutils.getcwd().rstrip('/')
890
drive = osutils.ntpath.splitdrive(cwd)[0]
891
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
892
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
895
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
898
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
899
"""Test win32 functions that create files."""
901
def test_getcwd(self):
902
self.requireFeature(features.UnicodeFilenameFeature)
905
# TODO: jam 20060427 This will probably fail on Mac OSX because
906
# it will change the normalization of B\xe5gfors
907
# Consider using a different unicode character, or make
908
# osutils.getcwd() renormalize the path.
909
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
911
def test_minimum_path_selection(self):
912
self.assertEqual(set(),
913
osutils.minimum_path_selection([]))
914
self.assertEqual(set(['a']),
915
osutils.minimum_path_selection(['a']))
916
self.assertEqual(set(['a', 'b']),
917
osutils.minimum_path_selection(['a', 'b']))
918
self.assertEqual(set(['a/', 'b']),
919
osutils.minimum_path_selection(['a/', 'b']))
920
self.assertEqual(set(['a/', 'b']),
921
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
922
self.assertEqual(set(['a-b', 'a', 'a0b']),
923
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
925
def test_mkdtemp(self):
926
tmpdir = osutils._win32_mkdtemp(dir='.')
927
self.assertFalse('\\' in tmpdir)
929
def test_rename(self):
937
osutils._win32_rename('b', 'a')
938
self.assertPathExists('a')
939
self.assertPathDoesNotExist('b')
940
self.assertFileEqual('baz\n', 'a')
942
def test_rename_missing_file(self):
948
osutils._win32_rename('b', 'a')
949
except (IOError, OSError), e:
950
self.assertEqual(errno.ENOENT, e.errno)
951
self.assertFileEqual('foo\n', 'a')
953
def test_rename_missing_dir(self):
956
osutils._win32_rename('b', 'a')
957
except (IOError, OSError), e:
958
self.assertEqual(errno.ENOENT, e.errno)
960
def test_rename_current_dir(self):
963
# You can't rename the working directory
964
# doing rename non-existant . usually
965
# just raises ENOENT, since non-existant
968
osutils._win32_rename('b', '.')
969
except (IOError, OSError), e:
970
self.assertEqual(errno.ENOENT, e.errno)
972
def test_splitpath(self):
973
def check(expected, path):
974
self.assertEqual(expected, osutils.splitpath(path))
977
check(['a', 'b'], 'a/b')
978
check(['a', 'b'], 'a/./b')
979
check(['a', '.b'], 'a/.b')
980
check(['a', '.b'], 'a\\.b')
982
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
985
class TestParentDirectories(tests.TestCaseInTempDir):
986
"""Test osutils.parent_directories()"""
988
def test_parent_directories(self):
989
self.assertEqual([], osutils.parent_directories('a'))
990
self.assertEqual(['a'], osutils.parent_directories('a/b'))
991
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
994
class TestMacFuncsDirs(tests.TestCaseInTempDir):
995
"""Test mac special functions that require directories."""
997
def test_getcwd(self):
998
self.requireFeature(features.UnicodeFilenameFeature)
999
os.mkdir(u'B\xe5gfors')
1000
os.chdir(u'B\xe5gfors')
1001
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
1003
def test_getcwd_nonnorm(self):
1004
self.requireFeature(features.UnicodeFilenameFeature)
1005
# Test that _mac_getcwd() will normalize this path
1006
os.mkdir(u'Ba\u030agfors')
1007
os.chdir(u'Ba\u030agfors')
1008
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
1011
class TestChunksToLines(tests.TestCase):
1013
def test_smoketest(self):
1014
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1015
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
1016
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
1017
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
1019
def test_osutils_binding(self):
1020
from bzrlib.tests import test__chunks_to_lines
1021
if test__chunks_to_lines.compiled_chunkstolines_feature.available():
1022
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
1024
from bzrlib._chunks_to_lines_py import chunks_to_lines
1025
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1028
class TestSplitLines(tests.TestCase):
1030
def test_split_unicode(self):
1031
self.assertEqual([u'foo\n', u'bar\xae'],
1032
osutils.split_lines(u'foo\nbar\xae'))
1033
self.assertEqual([u'foo\n', u'bar\xae\n'],
1034
osutils.split_lines(u'foo\nbar\xae\n'))
1036
def test_split_with_carriage_returns(self):
1037
self.assertEqual(['foo\rbar\n'],
1038
osutils.split_lines('foo\rbar\n'))
1041
class TestWalkDirs(tests.TestCaseInTempDir):
1043
def assertExpectedBlocks(self, expected, result):
1044
self.assertEqual(expected,
1045
[(dirinfo, [line[0:3] for line in block])
1046
for dirinfo, block in result])
1048
def test_walkdirs(self):
1057
self.build_tree(tree)
1058
expected_dirblocks = [
1060
[('0file', '0file', 'file'),
1061
('1dir', '1dir', 'directory'),
1062
('2file', '2file', 'file'),
1065
(('1dir', './1dir'),
1066
[('1dir/0file', '0file', 'file'),
1067
('1dir/1dir', '1dir', 'directory'),
1070
(('1dir/1dir', './1dir/1dir'),
1076
found_bzrdir = False
1077
for dirdetail, dirblock in osutils.walkdirs('.'):
1078
if len(dirblock) and dirblock[0][1] == '.bzr':
1079
# this tests the filtering of selected paths
1082
result.append((dirdetail, dirblock))
1084
self.assertTrue(found_bzrdir)
1085
self.assertExpectedBlocks(expected_dirblocks, result)
1086
# you can search a subdir only, with a supplied prefix.
1088
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1089
result.append(dirblock)
1090
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1092
def test_walkdirs_os_error(self):
1093
# <https://bugs.launchpad.net/bzr/+bug/338653>
1094
# Pyrex readdir didn't raise useful messages if it had an error
1095
# reading the directory
1096
if sys.platform == 'win32':
1097
raise tests.TestNotApplicable(
1098
"readdir IOError not tested on win32")
1099
self.requireFeature(features.not_running_as_root)
1100
os.mkdir("test-unreadable")
1101
os.chmod("test-unreadable", 0000)
1102
# must chmod it back so that it can be removed
1103
self.addCleanup(os.chmod, "test-unreadable", 0700)
1104
# The error is not raised until the generator is actually evaluated.
1105
# (It would be ok if it happened earlier but at the moment it
1107
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1108
self.assertEquals('./test-unreadable', e.filename)
1109
self.assertEquals(errno.EACCES, e.errno)
1110
# Ensure the message contains the file name
1111
self.assertContainsRe(str(e), "\./test-unreadable")
1114
def test_walkdirs_encoding_error(self):
1115
# <https://bugs.launchpad.net/bzr/+bug/488519>
1116
# walkdirs didn't raise a useful message when the filenames
1117
# are not using the filesystem's encoding
1119
# require a bytestring based filesystem
1120
self.requireFeature(features.ByteStringNamedFilesystem)
1131
self.build_tree(tree)
1133
# rename the 1file to a latin-1 filename
1134
os.rename("./1file", "\xe8file")
1135
if "\xe8file" not in os.listdir("."):
1136
self.skip("Lack filesystem that preserves arbitrary bytes")
1138
self._save_platform_info()
1139
win32utils.winver = None # Avoid the win32 detection code
1140
osutils._fs_enc = 'UTF-8'
1142
# this should raise on error
1144
for dirdetail, dirblock in osutils.walkdirs('.'):
1147
self.assertRaises(errors.BadFilenameEncoding, attempt)
1149
def test__walkdirs_utf8(self):
1158
self.build_tree(tree)
1159
expected_dirblocks = [
1161
[('0file', '0file', 'file'),
1162
('1dir', '1dir', 'directory'),
1163
('2file', '2file', 'file'),
1166
(('1dir', './1dir'),
1167
[('1dir/0file', '0file', 'file'),
1168
('1dir/1dir', '1dir', 'directory'),
1171
(('1dir/1dir', './1dir/1dir'),
1177
found_bzrdir = False
1178
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1179
if len(dirblock) and dirblock[0][1] == '.bzr':
1180
# this tests the filtering of selected paths
1183
result.append((dirdetail, dirblock))
1185
self.assertTrue(found_bzrdir)
1186
self.assertExpectedBlocks(expected_dirblocks, result)
1188
# you can search a subdir only, with a supplied prefix.
1190
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1191
result.append(dirblock)
1192
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1194
def _filter_out_stat(self, result):
1195
"""Filter out the stat value from the walkdirs result"""
1196
for dirdetail, dirblock in result:
1198
for info in dirblock:
1199
# Ignore info[3] which is the stat
1200
new_dirblock.append((info[0], info[1], info[2], info[4]))
1201
dirblock[:] = new_dirblock
1203
def _save_platform_info(self):
1204
self.overrideAttr(win32utils, 'winver')
1205
self.overrideAttr(osutils, '_fs_enc')
1206
self.overrideAttr(osutils, '_selected_dir_reader')
1208
def assertDirReaderIs(self, expected):
1209
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1210
# Force it to redetect
1211
osutils._selected_dir_reader = None
1212
# Nothing to list, but should still trigger the selection logic
1213
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1214
self.assertIsInstance(osutils._selected_dir_reader, expected)
1216
def test_force_walkdirs_utf8_fs_utf8(self):
1217
self.requireFeature(UTF8DirReaderFeature)
1218
self._save_platform_info()
1219
win32utils.winver = None # Avoid the win32 detection code
1220
osutils._fs_enc = 'utf-8'
1221
self.assertDirReaderIs(
1222
UTF8DirReaderFeature.module.UTF8DirReader)
1224
def test_force_walkdirs_utf8_fs_ascii(self):
1225
self.requireFeature(UTF8DirReaderFeature)
1226
self._save_platform_info()
1227
win32utils.winver = None # Avoid the win32 detection code
1228
osutils._fs_enc = 'ascii'
1229
self.assertDirReaderIs(
1230
UTF8DirReaderFeature.module.UTF8DirReader)
1232
def test_force_walkdirs_utf8_fs_latin1(self):
1233
self._save_platform_info()
1234
win32utils.winver = None # Avoid the win32 detection code
1235
osutils._fs_enc = 'iso-8859-1'
1236
self.assertDirReaderIs(osutils.UnicodeDirReader)
1238
def test_force_walkdirs_utf8_nt(self):
1239
# Disabled because the thunk of the whole walkdirs api is disabled.
1240
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1241
self._save_platform_info()
1242
win32utils.winver = 'Windows NT'
1243
from bzrlib._walkdirs_win32 import Win32ReadDir
1244
self.assertDirReaderIs(Win32ReadDir)
1246
def test_force_walkdirs_utf8_98(self):
1247
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1248
self._save_platform_info()
1249
win32utils.winver = 'Windows 98'
1250
self.assertDirReaderIs(osutils.UnicodeDirReader)
1252
def test_unicode_walkdirs(self):
1253
"""Walkdirs should always return unicode paths."""
1254
self.requireFeature(features.UnicodeFilenameFeature)
1255
name0 = u'0file-\xb6'
1256
name1 = u'1dir-\u062c\u0648'
1257
name2 = u'2file-\u0633'
1261
name1 + '/' + name0,
1262
name1 + '/' + name1 + '/',
1265
self.build_tree(tree)
1266
expected_dirblocks = [
1268
[(name0, name0, 'file', './' + name0),
1269
(name1, name1, 'directory', './' + name1),
1270
(name2, name2, 'file', './' + name2),
1273
((name1, './' + name1),
1274
[(name1 + '/' + name0, name0, 'file', './' + name1
1276
(name1 + '/' + name1, name1, 'directory', './' + name1
1280
((name1 + '/' + name1, './' + name1 + '/' + name1),
1285
result = list(osutils.walkdirs('.'))
1286
self._filter_out_stat(result)
1287
self.assertEqual(expected_dirblocks, result)
1288
result = list(osutils.walkdirs(u'./'+name1, name1))
1289
self._filter_out_stat(result)
1290
self.assertEqual(expected_dirblocks[1:], result)
1292
def test_unicode__walkdirs_utf8(self):
1293
"""Walkdirs_utf8 should always return utf8 paths.
1295
The abspath portion might be in unicode or utf-8
1297
self.requireFeature(features.UnicodeFilenameFeature)
1298
name0 = u'0file-\xb6'
1299
name1 = u'1dir-\u062c\u0648'
1300
name2 = u'2file-\u0633'
1304
name1 + '/' + name0,
1305
name1 + '/' + name1 + '/',
1308
self.build_tree(tree)
1309
name0 = name0.encode('utf8')
1310
name1 = name1.encode('utf8')
1311
name2 = name2.encode('utf8')
1313
expected_dirblocks = [
1315
[(name0, name0, 'file', './' + name0),
1316
(name1, name1, 'directory', './' + name1),
1317
(name2, name2, 'file', './' + name2),
1320
((name1, './' + name1),
1321
[(name1 + '/' + name0, name0, 'file', './' + name1
1323
(name1 + '/' + name1, name1, 'directory', './' + name1
1327
((name1 + '/' + name1, './' + name1 + '/' + name1),
1333
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1334
# all abspaths are Unicode, and encode them back into utf8.
1335
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1336
self.assertIsInstance(dirdetail[0], str)
1337
if isinstance(dirdetail[1], unicode):
1338
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1339
dirblock = [list(info) for info in dirblock]
1340
for info in dirblock:
1341
self.assertIsInstance(info[4], unicode)
1342
info[4] = info[4].encode('utf8')
1344
for info in dirblock:
1345
self.assertIsInstance(info[0], str)
1346
self.assertIsInstance(info[1], str)
1347
self.assertIsInstance(info[4], str)
1348
# Remove the stat information
1349
new_dirblock.append((info[0], info[1], info[2], info[4]))
1350
result.append((dirdetail, new_dirblock))
1351
self.assertEqual(expected_dirblocks, result)
1353
def test__walkdirs_utf8_with_unicode_fs(self):
1354
"""UnicodeDirReader should be a safe fallback everywhere
1356
The abspath portion should be in unicode
1358
self.requireFeature(features.UnicodeFilenameFeature)
1359
# Use the unicode reader. TODO: split into driver-and-driven unit
1361
self._save_platform_info()
1362
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1363
name0u = u'0file-\xb6'
1364
name1u = u'1dir-\u062c\u0648'
1365
name2u = u'2file-\u0633'
1369
name1u + '/' + name0u,
1370
name1u + '/' + name1u + '/',
1373
self.build_tree(tree)
1374
name0 = name0u.encode('utf8')
1375
name1 = name1u.encode('utf8')
1376
name2 = name2u.encode('utf8')
1378
# All of the abspaths should be in unicode, all of the relative paths
1380
expected_dirblocks = [
1382
[(name0, name0, 'file', './' + name0u),
1383
(name1, name1, 'directory', './' + name1u),
1384
(name2, name2, 'file', './' + name2u),
1387
((name1, './' + name1u),
1388
[(name1 + '/' + name0, name0, 'file', './' + name1u
1390
(name1 + '/' + name1, name1, 'directory', './' + name1u
1394
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1399
result = list(osutils._walkdirs_utf8('.'))
1400
self._filter_out_stat(result)
1401
self.assertEqual(expected_dirblocks, result)
1403
def test__walkdirs_utf8_win32readdir(self):
1404
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1405
self.requireFeature(features.UnicodeFilenameFeature)
1406
from bzrlib._walkdirs_win32 import Win32ReadDir
1407
self._save_platform_info()
1408
osutils._selected_dir_reader = Win32ReadDir()
1409
name0u = u'0file-\xb6'
1410
name1u = u'1dir-\u062c\u0648'
1411
name2u = u'2file-\u0633'
1415
name1u + '/' + name0u,
1416
name1u + '/' + name1u + '/',
1419
self.build_tree(tree)
1420
name0 = name0u.encode('utf8')
1421
name1 = name1u.encode('utf8')
1422
name2 = name2u.encode('utf8')
1424
# All of the abspaths should be in unicode, all of the relative paths
1426
expected_dirblocks = [
1428
[(name0, name0, 'file', './' + name0u),
1429
(name1, name1, 'directory', './' + name1u),
1430
(name2, name2, 'file', './' + name2u),
1433
((name1, './' + name1u),
1434
[(name1 + '/' + name0, name0, 'file', './' + name1u
1436
(name1 + '/' + name1, name1, 'directory', './' + name1u
1440
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1445
result = list(osutils._walkdirs_utf8(u'.'))
1446
self._filter_out_stat(result)
1447
self.assertEqual(expected_dirblocks, result)
1449
def assertStatIsCorrect(self, path, win32stat):
1450
os_stat = os.stat(path)
1451
self.assertEqual(os_stat.st_size, win32stat.st_size)
1452
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1453
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1454
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1455
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1456
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1457
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1459
def test__walkdirs_utf_win32_find_file_stat_file(self):
1460
"""make sure our Stat values are valid"""
1461
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1462
self.requireFeature(features.UnicodeFilenameFeature)
1463
from bzrlib._walkdirs_win32 import Win32ReadDir
1464
name0u = u'0file-\xb6'
1465
name0 = name0u.encode('utf8')
1466
self.build_tree([name0u])
1467
# I hate to sleep() here, but I'm trying to make the ctime different
1470
f = open(name0u, 'ab')
1472
f.write('just a small update')
1476
result = Win32ReadDir().read_dir('', u'.')
1478
self.assertEqual((name0, name0, 'file'), entry[:3])
1479
self.assertEqual(u'./' + name0u, entry[4])
1480
self.assertStatIsCorrect(entry[4], entry[3])
1481
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1483
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1484
"""make sure our Stat values are valid"""
1485
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1486
self.requireFeature(features.UnicodeFilenameFeature)
1487
from bzrlib._walkdirs_win32 import Win32ReadDir
1488
name0u = u'0dir-\u062c\u0648'
1489
name0 = name0u.encode('utf8')
1490
self.build_tree([name0u + '/'])
1492
result = Win32ReadDir().read_dir('', u'.')
1494
self.assertEqual((name0, name0, 'directory'), entry[:3])
1495
self.assertEqual(u'./' + name0u, entry[4])
1496
self.assertStatIsCorrect(entry[4], entry[3])
1498
def assertPathCompare(self, path_less, path_greater):
1499
"""check that path_less and path_greater compare correctly."""
1500
self.assertEqual(0, osutils.compare_paths_prefix_order(
1501
path_less, path_less))
1502
self.assertEqual(0, osutils.compare_paths_prefix_order(
1503
path_greater, path_greater))
1504
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1505
path_less, path_greater))
1506
self.assertEqual(1, osutils.compare_paths_prefix_order(
1507
path_greater, path_less))
1509
def test_compare_paths_prefix_order(self):
1510
# root before all else
1511
self.assertPathCompare("/", "/a")
1512
# alpha within a dir
1513
self.assertPathCompare("/a", "/b")
1514
self.assertPathCompare("/b", "/z")
1515
# high dirs before lower.
1516
self.assertPathCompare("/z", "/a/a")
1517
# except if the deeper dir should be output first
1518
self.assertPathCompare("/a/b/c", "/d/g")
1519
# lexical betwen dirs of the same height
1520
self.assertPathCompare("/a/z", "/z/z")
1521
self.assertPathCompare("/a/c/z", "/a/d/e")
1523
# this should also be consistent for no leading / paths
1524
# root before all else
1525
self.assertPathCompare("", "a")
1526
# alpha within a dir
1527
self.assertPathCompare("a", "b")
1528
self.assertPathCompare("b", "z")
1529
# high dirs before lower.
1530
self.assertPathCompare("z", "a/a")
1531
# except if the deeper dir should be output first
1532
self.assertPathCompare("a/b/c", "d/g")
1533
# lexical betwen dirs of the same height
1534
self.assertPathCompare("a/z", "z/z")
1535
self.assertPathCompare("a/c/z", "a/d/e")
1537
def test_path_prefix_sorting(self):
1538
"""Doing a sort on path prefix should match our sample data."""
1553
dir_sorted_paths = [
1569
sorted(original_paths, key=osutils.path_prefix_key))
1570
# using the comparison routine shoudl work too:
1573
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1576
class TestCopyTree(tests.TestCaseInTempDir):
1578
def test_copy_basic_tree(self):
1579
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1580
osutils.copy_tree('source', 'target')
1581
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1582
self.assertEqual(['c'], os.listdir('target/b'))
1584
def test_copy_tree_target_exists(self):
1585
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1587
osutils.copy_tree('source', 'target')
1588
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1589
self.assertEqual(['c'], os.listdir('target/b'))
1591
def test_copy_tree_symlinks(self):
1592
self.requireFeature(features.SymlinkFeature)
1593
self.build_tree(['source/'])
1594
os.symlink('a/generic/path', 'source/lnk')
1595
osutils.copy_tree('source', 'target')
1596
self.assertEqual(['lnk'], os.listdir('target'))
1597
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1599
def test_copy_tree_handlers(self):
1600
processed_files = []
1601
processed_links = []
1602
def file_handler(from_path, to_path):
1603
processed_files.append(('f', from_path, to_path))
1604
def dir_handler(from_path, to_path):
1605
processed_files.append(('d', from_path, to_path))
1606
def link_handler(from_path, to_path):
1607
processed_links.append((from_path, to_path))
1608
handlers = {'file':file_handler,
1609
'directory':dir_handler,
1610
'symlink':link_handler,
1613
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1614
if osutils.has_symlinks():
1615
os.symlink('a/generic/path', 'source/lnk')
1616
osutils.copy_tree('source', 'target', handlers=handlers)
1618
self.assertEqual([('d', 'source', 'target'),
1619
('f', 'source/a', 'target/a'),
1620
('d', 'source/b', 'target/b'),
1621
('f', 'source/b/c', 'target/b/c'),
1623
self.assertPathDoesNotExist('target')
1624
if osutils.has_symlinks():
1625
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1628
class TestSetUnsetEnv(tests.TestCase):
1629
"""Test updating the environment"""
1632
super(TestSetUnsetEnv, self).setUp()
1634
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1635
'Environment was not cleaned up properly.'
1636
' Variable BZR_TEST_ENV_VAR should not exist.')
1638
if 'BZR_TEST_ENV_VAR' in os.environ:
1639
del os.environ['BZR_TEST_ENV_VAR']
1640
self.addCleanup(cleanup)
1643
"""Test that we can set an env variable"""
1644
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1645
self.assertEqual(None, old)
1646
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1648
def test_double_set(self):
1649
"""Test that we get the old value out"""
1650
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1651
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1652
self.assertEqual('foo', old)
1653
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1655
def test_unicode(self):
1656
"""Environment can only contain plain strings
1658
So Unicode strings must be encoded.
1660
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1662
raise tests.TestSkipped(
1663
'Cannot find a unicode character that works in encoding %s'
1664
% (osutils.get_user_encoding(),))
1666
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1667
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1669
def test_unset(self):
1670
"""Test that passing None will remove the env var"""
1671
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1672
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1673
self.assertEqual('foo', old)
1674
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1675
self.assertFalse('BZR_TEST_ENV_VAR' in os.environ)
1678
class TestSizeShaFile(tests.TestCaseInTempDir):
1680
def test_sha_empty(self):
1681
self.build_tree_contents([('foo', '')])
1682
expected_sha = osutils.sha_string('')
1684
self.addCleanup(f.close)
1685
size, sha = osutils.size_sha_file(f)
1686
self.assertEqual(0, size)
1687
self.assertEqual(expected_sha, sha)
1689
def test_sha_mixed_endings(self):
1690
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1691
self.build_tree_contents([('foo', text)])
1692
expected_sha = osutils.sha_string(text)
1693
f = open('foo', 'rb')
1694
self.addCleanup(f.close)
1695
size, sha = osutils.size_sha_file(f)
1696
self.assertEqual(38, size)
1697
self.assertEqual(expected_sha, sha)
1700
class TestShaFileByName(tests.TestCaseInTempDir):
1702
def test_sha_empty(self):
1703
self.build_tree_contents([('foo', '')])
1704
expected_sha = osutils.sha_string('')
1705
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1707
def test_sha_mixed_endings(self):
1708
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1709
self.build_tree_contents([('foo', text)])
1710
expected_sha = osutils.sha_string(text)
1711
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1714
class TestResourceLoading(tests.TestCaseInTempDir):
1716
def test_resource_string(self):
1717
# test resource in bzrlib
1718
text = osutils.resource_string('bzrlib', 'debug.py')
1719
self.assertContainsRe(text, "debug_flags = set()")
1720
# test resource under bzrlib
1721
text = osutils.resource_string('bzrlib.ui', 'text.py')
1722
self.assertContainsRe(text, "class TextUIFactory")
1723
# test unsupported package
1724
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1726
# test unknown resource
1727
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1730
class TestReCompile(tests.TestCase):
1732
def _deprecated_re_compile_checked(self, *args, **kwargs):
1733
return self.applyDeprecated(symbol_versioning.deprecated_in((2, 2, 0)),
1734
osutils.re_compile_checked, *args, **kwargs)
1736
def test_re_compile_checked(self):
1737
r = self._deprecated_re_compile_checked(r'A*', re.IGNORECASE)
1738
self.assertTrue(r.match('aaaa'))
1739
self.assertTrue(r.match('aAaA'))
1741
def test_re_compile_checked_error(self):
1742
# like https://bugs.launchpad.net/bzr/+bug/251352
1744
# Due to possible test isolation error, re.compile is not lazy at
1745
# this point. We re-install lazy compile.
1746
lazy_regex.install_lazy_compile()
1747
err = self.assertRaises(
1748
errors.BzrCommandError,
1749
self._deprecated_re_compile_checked, '*', re.IGNORECASE, 'test case')
1751
'Invalid regular expression in test case: '
1752
'"*" nothing to repeat',
1756
class TestDirReader(tests.TestCaseInTempDir):
1758
scenarios = dir_reader_scenarios()
1761
_dir_reader_class = None
1762
_native_to_unicode = None
1765
tests.TestCaseInTempDir.setUp(self)
1766
self.overrideAttr(osutils,
1767
'_selected_dir_reader', self._dir_reader_class())
1769
def _get_ascii_tree(self):
1777
expected_dirblocks = [
1779
[('0file', '0file', 'file'),
1780
('1dir', '1dir', 'directory'),
1781
('2file', '2file', 'file'),
1784
(('1dir', './1dir'),
1785
[('1dir/0file', '0file', 'file'),
1786
('1dir/1dir', '1dir', 'directory'),
1789
(('1dir/1dir', './1dir/1dir'),
1794
return tree, expected_dirblocks
1796
def test_walk_cur_dir(self):
1797
tree, expected_dirblocks = self._get_ascii_tree()
1798
self.build_tree(tree)
1799
result = list(osutils._walkdirs_utf8('.'))
1800
# Filter out stat and abspath
1801
self.assertEqual(expected_dirblocks,
1802
[(dirinfo, [line[0:3] for line in block])
1803
for dirinfo, block in result])
1805
def test_walk_sub_dir(self):
1806
tree, expected_dirblocks = self._get_ascii_tree()
1807
self.build_tree(tree)
1808
# you can search a subdir only, with a supplied prefix.
1809
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1810
# Filter out stat and abspath
1811
self.assertEqual(expected_dirblocks[1:],
1812
[(dirinfo, [line[0:3] for line in block])
1813
for dirinfo, block in result])
1815
def _get_unicode_tree(self):
1816
name0u = u'0file-\xb6'
1817
name1u = u'1dir-\u062c\u0648'
1818
name2u = u'2file-\u0633'
1822
name1u + '/' + name0u,
1823
name1u + '/' + name1u + '/',
1826
name0 = name0u.encode('UTF-8')
1827
name1 = name1u.encode('UTF-8')
1828
name2 = name2u.encode('UTF-8')
1829
expected_dirblocks = [
1831
[(name0, name0, 'file', './' + name0u),
1832
(name1, name1, 'directory', './' + name1u),
1833
(name2, name2, 'file', './' + name2u),
1836
((name1, './' + name1u),
1837
[(name1 + '/' + name0, name0, 'file', './' + name1u
1839
(name1 + '/' + name1, name1, 'directory', './' + name1u
1843
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1848
return tree, expected_dirblocks
1850
def _filter_out(self, raw_dirblocks):
1851
"""Filter out a walkdirs_utf8 result.
1853
stat field is removed, all native paths are converted to unicode
1855
filtered_dirblocks = []
1856
for dirinfo, block in raw_dirblocks:
1857
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1860
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1861
filtered_dirblocks.append((dirinfo, details))
1862
return filtered_dirblocks
1864
def test_walk_unicode_tree(self):
1865
self.requireFeature(features.UnicodeFilenameFeature)
1866
tree, expected_dirblocks = self._get_unicode_tree()
1867
self.build_tree(tree)
1868
result = list(osutils._walkdirs_utf8('.'))
1869
self.assertEqual(expected_dirblocks, self._filter_out(result))
1871
def test_symlink(self):
1872
self.requireFeature(features.SymlinkFeature)
1873
self.requireFeature(features.UnicodeFilenameFeature)
1874
target = u'target\N{Euro Sign}'
1875
link_name = u'l\N{Euro Sign}nk'
1876
os.symlink(target, link_name)
1877
target_utf8 = target.encode('UTF-8')
1878
link_name_utf8 = link_name.encode('UTF-8')
1879
expected_dirblocks = [
1881
[(link_name_utf8, link_name_utf8,
1882
'symlink', './' + link_name),],
1884
result = list(osutils._walkdirs_utf8('.'))
1885
self.assertEqual(expected_dirblocks, self._filter_out(result))
1888
class TestReadLink(tests.TestCaseInTempDir):
1889
"""Exposes os.readlink() problems and the osutils solution.
1891
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1892
unicode string will be returned if a unicode string is passed.
1894
But prior python versions failed to properly encode the passed unicode
1897
_test_needs_features = [features.SymlinkFeature, features.UnicodeFilenameFeature]
1900
super(tests.TestCaseInTempDir, self).setUp()
1901
self.link = u'l\N{Euro Sign}ink'
1902
self.target = u'targe\N{Euro Sign}t'
1903
os.symlink(self.target, self.link)
1905
def test_os_readlink_link_encoding(self):
1906
self.assertEquals(self.target, os.readlink(self.link))
1908
def test_os_readlink_link_decoding(self):
1909
self.assertEquals(self.target.encode(osutils._fs_enc),
1910
os.readlink(self.link.encode(osutils._fs_enc)))
1913
class TestConcurrency(tests.TestCase):
1916
super(TestConcurrency, self).setUp()
1917
self.overrideAttr(osutils, '_cached_local_concurrency')
1919
def test_local_concurrency(self):
1920
concurrency = osutils.local_concurrency()
1921
self.assertIsInstance(concurrency, int)
1923
def test_local_concurrency_environment_variable(self):
1924
self.overrideEnv('BZR_CONCURRENCY', '2')
1925
self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1926
self.overrideEnv('BZR_CONCURRENCY', '3')
1927
self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1928
self.overrideEnv('BZR_CONCURRENCY', 'foo')
1929
self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1931
def test_option_concurrency(self):
1932
self.overrideEnv('BZR_CONCURRENCY', '1')
1933
self.run_bzr('rocks --concurrency 42')
1934
# Command line overrides environment variable
1935
self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1936
self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1939
class TestFailedToLoadExtension(tests.TestCase):
1941
def _try_loading(self):
1943
import bzrlib._fictional_extension_py
1944
except ImportError, e:
1945
osutils.failed_to_load_extension(e)
1949
super(TestFailedToLoadExtension, self).setUp()
1950
self.overrideAttr(osutils, '_extension_load_failures', [])
1952
def test_failure_to_load(self):
1954
self.assertLength(1, osutils._extension_load_failures)
1955
self.assertEquals(osutils._extension_load_failures[0],
1956
"No module named _fictional_extension_py")
1958
def test_report_extension_load_failures_no_warning(self):
1959
self.assertTrue(self._try_loading())
1960
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1961
# it used to give a Python warning; it no longer does
1962
self.assertLength(0, warnings)
1964
def test_report_extension_load_failures_message(self):
1966
trace.push_log_file(log)
1967
self.assertTrue(self._try_loading())
1968
osutils.report_extension_load_failures()
1969
self.assertContainsRe(
1971
r"bzr: warning: some compiled extensions could not be loaded; "
1972
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1976
class TestTerminalWidth(tests.TestCase):
1979
tests.TestCase.setUp(self)
1980
self._orig_terminal_size_state = osutils._terminal_size_state
1981
self._orig_first_terminal_size = osutils._first_terminal_size
1982
self.addCleanup(self.restore_osutils_globals)
1983
osutils._terminal_size_state = 'no_data'
1984
osutils._first_terminal_size = None
1986
def restore_osutils_globals(self):
1987
osutils._terminal_size_state = self._orig_terminal_size_state
1988
osutils._first_terminal_size = self._orig_first_terminal_size
1990
def replace_stdout(self, new):
1991
self.overrideAttr(sys, 'stdout', new)
1993
def replace__terminal_size(self, new):
1994
self.overrideAttr(osutils, '_terminal_size', new)
1996
def set_fake_tty(self):
1998
class I_am_a_tty(object):
2002
self.replace_stdout(I_am_a_tty())
2004
def test_default_values(self):
2005
self.assertEqual(80, osutils.default_terminal_width)
2007
def test_defaults_to_BZR_COLUMNS(self):
2008
# BZR_COLUMNS is set by the test framework
2009
self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
2010
self.overrideEnv('BZR_COLUMNS', '12')
2011
self.assertEqual(12, osutils.terminal_width())
2013
def test_BZR_COLUMNS_0_no_limit(self):
2014
self.overrideEnv('BZR_COLUMNS', '0')
2015
self.assertEqual(None, osutils.terminal_width())
2017
def test_falls_back_to_COLUMNS(self):
2018
self.overrideEnv('BZR_COLUMNS', None)
2019
self.assertNotEqual('42', os.environ['COLUMNS'])
2021
self.overrideEnv('COLUMNS', '42')
2022
self.assertEqual(42, osutils.terminal_width())
2024
def test_tty_default_without_columns(self):
2025
self.overrideEnv('BZR_COLUMNS', None)
2026
self.overrideEnv('COLUMNS', None)
2028
def terminal_size(w, h):
2032
# We need to override the osutils definition as it depends on the
2033
# running environment that we can't control (PQM running without a
2034
# controlling terminal is one example).
2035
self.replace__terminal_size(terminal_size)
2036
self.assertEqual(42, osutils.terminal_width())
2038
def test_non_tty_default_without_columns(self):
2039
self.overrideEnv('BZR_COLUMNS', None)
2040
self.overrideEnv('COLUMNS', None)
2041
self.replace_stdout(None)
2042
self.assertEqual(None, osutils.terminal_width())
2044
def test_no_TIOCGWINSZ(self):
2045
self.requireFeature(term_ios_feature)
2046
termios = term_ios_feature.module
2047
# bug 63539 is about a termios without TIOCGWINSZ attribute
2049
orig = termios.TIOCGWINSZ
2050
except AttributeError:
2051
# We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2054
self.overrideAttr(termios, 'TIOCGWINSZ')
2055
del termios.TIOCGWINSZ
2056
self.overrideEnv('BZR_COLUMNS', None)
2057
self.overrideEnv('COLUMNS', None)
2058
# Whatever the result is, if we don't raise an exception, it's ok.
2059
osutils.terminal_width()
2062
class TestCreationOps(tests.TestCaseInTempDir):
2063
_test_needs_features = [features.chown_feature]
2066
tests.TestCaseInTempDir.setUp(self)
2067
self.overrideAttr(os, 'chown', self._dummy_chown)
2069
# params set by call to _dummy_chown
2070
self.path = self.uid = self.gid = None
2072
def _dummy_chown(self, path, uid, gid):
2073
self.path, self.uid, self.gid = path, uid, gid
2075
def test_copy_ownership_from_path(self):
2076
"""copy_ownership_from_path test with specified src."""
2078
f = open('test_file', 'wt')
2079
osutils.copy_ownership_from_path('test_file', ownsrc)
2082
self.assertEquals(self.path, 'test_file')
2083
self.assertEquals(self.uid, s.st_uid)
2084
self.assertEquals(self.gid, s.st_gid)
2086
def test_copy_ownership_nonesrc(self):
2087
"""copy_ownership_from_path test with src=None."""
2088
f = open('test_file', 'wt')
2089
# should use parent dir for permissions
2090
osutils.copy_ownership_from_path('test_file')
2093
self.assertEquals(self.path, 'test_file')
2094
self.assertEquals(self.uid, s.st_uid)
2095
self.assertEquals(self.gid, s.st_gid)
2098
class TestPathFromEnviron(tests.TestCase):
2100
def test_is_unicode(self):
2101
self.overrideEnv('BZR_TEST_PATH', './anywhere at all/')
2102
path = osutils.path_from_environ('BZR_TEST_PATH')
2103
self.assertIsInstance(path, unicode)
2104
self.assertEqual(u'./anywhere at all/', path)
2106
def test_posix_path_env_ascii(self):
2107
self.overrideEnv('BZR_TEST_PATH', '/tmp')
2108
home = osutils._posix_path_from_environ('BZR_TEST_PATH')
2109
self.assertIsInstance(home, unicode)
2110
self.assertEqual(u'/tmp', home)
2112
def test_posix_path_env_unicode(self):
2113
self.requireFeature(features.ByteStringNamedFilesystem)
2114
self.overrideEnv('BZR_TEST_PATH', '/home/\xa7test')
2115
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2116
self.assertEqual(u'/home/\xa7test',
2117
osutils._posix_path_from_environ('BZR_TEST_PATH'))
2118
osutils._fs_enc = "iso8859-5"
2119
self.assertEqual(u'/home/\u0407test',
2120
osutils._posix_path_from_environ('BZR_TEST_PATH'))
2121
osutils._fs_enc = "utf-8"
2122
self.assertRaises(errors.BadFilenameEncoding,
2123
osutils._posix_path_from_environ, 'BZR_TEST_PATH')
2126
class TestGetHomeDir(tests.TestCase):
2128
def test_is_unicode(self):
2129
home = osutils._get_home_dir()
2130
self.assertIsInstance(home, unicode)
2132
def test_posix_homeless(self):
2133
self.overrideEnv('HOME', None)
2134
home = osutils._get_home_dir()
2135
self.assertIsInstance(home, unicode)
2137
def test_posix_home_ascii(self):
2138
self.overrideEnv('HOME', '/home/test')
2139
home = osutils._posix_get_home_dir()
2140
self.assertIsInstance(home, unicode)
2141
self.assertEqual(u'/home/test', home)
2143
def test_posix_home_unicode(self):
2144
self.requireFeature(features.ByteStringNamedFilesystem)
2145
self.overrideEnv('HOME', '/home/\xa7test')
2146
self.overrideAttr(osutils, "_fs_enc", "iso8859-1")
2147
self.assertEqual(u'/home/\xa7test', osutils._posix_get_home_dir())
2148
osutils._fs_enc = "iso8859-5"
2149
self.assertEqual(u'/home/\u0407test', osutils._posix_get_home_dir())
2150
osutils._fs_enc = "utf-8"
2151
self.assertRaises(errors.BadFilenameEncoding,
2152
osutils._posix_get_home_dir)
2155
class TestGetuserUnicode(tests.TestCase):
2157
def test_is_unicode(self):
2158
user = osutils.getuser_unicode()
2159
self.assertIsInstance(user, unicode)
2161
def envvar_to_override(self):
2162
if sys.platform == "win32":
2163
# Disable use of platform calls on windows so envvar is used
2164
self.overrideAttr(win32utils, 'has_ctypes', False)
2165
return 'USERNAME' # only variable used on windows
2166
return 'LOGNAME' # first variable checked by getpass.getuser()
2168
def test_ascii_user(self):
2169
self.overrideEnv(self.envvar_to_override(), 'jrandom')
2170
self.assertEqual(u'jrandom', osutils.getuser_unicode())
2172
def test_unicode_user(self):
2173
ue = osutils.get_user_encoding()
2174
uni_val, env_val = tests.probe_unicode_in_user_encoding()
2176
raise tests.TestSkipped(
2177
'Cannot find a unicode character that works in encoding %s'
2178
% (osutils.get_user_encoding(),))
2179
uni_username = u'jrandom' + uni_val
2180
encoded_username = uni_username.encode(ue)
2181
self.overrideEnv(self.envvar_to_override(), encoded_username)
2182
self.assertEqual(uni_username, osutils.getuser_unicode())
2185
class TestBackupNames(tests.TestCase):
2188
super(TestBackupNames, self).setUp()
2191
def backup_exists(self, name):
2192
return name in self.backups
2194
def available_backup_name(self, name):
2195
backup_name = osutils.available_backup_name(name, self.backup_exists)
2196
self.backups.append(backup_name)
2199
def assertBackupName(self, expected, name):
2200
self.assertEqual(expected, self.available_backup_name(name))
2202
def test_empty(self):
2203
self.assertBackupName('file.~1~', 'file')
2205
def test_existing(self):
2206
self.available_backup_name('file')
2207
self.available_backup_name('file')
2208
self.assertBackupName('file.~3~', 'file')
2209
# Empty slots are found, this is not a strict requirement and may be
2210
# revisited if we test against all implementations.
2211
self.backups.remove('file.~2~')
2212
self.assertBackupName('file.~2~', 'file')
2215
class TestFindExecutableInPath(tests.TestCase):
2217
def test_windows(self):
2218
if sys.platform != 'win32':
2219
raise tests.TestSkipped('test requires win32')
2220
self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
2222
osutils.find_executable_on_path('explorer.exe') is not None)
2224
osutils.find_executable_on_path('EXPLORER.EXE') is not None)
2226
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2227
self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2229
def test_other(self):
2230
if sys.platform == 'win32':
2231
raise tests.TestSkipped('test requires non-win32')
2232
self.assertTrue(osutils.find_executable_on_path('sh') is not None)
2234
osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2237
class TestEnvironmentErrors(tests.TestCase):
2238
"""Test handling of environmental errors"""
2240
def test_is_oserror(self):
2241
self.assertTrue(osutils.is_environment_error(
2242
OSError(errno.EINVAL, "Invalid parameter")))
2244
def test_is_ioerror(self):
2245
self.assertTrue(osutils.is_environment_error(
2246
IOError(errno.EINVAL, "Invalid parameter")))
2248
def test_is_socket_error(self):
2249
self.assertTrue(osutils.is_environment_error(
2250
socket.error(errno.EINVAL, "Invalid parameter")))
2252
def test_is_select_error(self):
2253
self.assertTrue(osutils.is_environment_error(
2254
select.error(errno.EINVAL, "Invalid parameter")))
2256
def test_is_pywintypes_error(self):
2257
self.requireFeature(features.pywintypes)
2259
self.assertTrue(osutils.is_environment_error(
2260
pywintypes.error(errno.EINVAL, "Invalid parameter", "Caller")))