1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
35
from bzrlib.tests import (
41
class _UTF8DirReaderFeature(tests.Feature):
45
from bzrlib import _readdir_pyx
46
self.reader = _readdir_pyx.UTF8DirReader
51
def feature_name(self):
52
return 'bzrlib._readdir_pyx'
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
term_ios_feature = tests.ModuleAvailableFeature('termios')
59
def _already_unicode(s):
63
def _utf8_to_unicode(s):
64
return s.decode('UTF-8')
67
def dir_reader_scenarios():
68
# For each dir reader we define:
70
# - native_to_unicode: a function converting the native_abspath as returned
71
# by DirReader.read_dir to its unicode representation
73
# UnicodeDirReader is the fallback, it should be tested on all platforms.
74
scenarios = [('unicode',
75
dict(_dir_reader_class=osutils.UnicodeDirReader,
76
_native_to_unicode=_already_unicode))]
77
# Some DirReaders are platform specific and even there they may not be
79
if UTF8DirReaderFeature.available():
80
from bzrlib import _readdir_pyx
81
scenarios.append(('utf8',
82
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
83
_native_to_unicode=_utf8_to_unicode)))
85
if test__walkdirs_win32.win32_readdir_feature.available():
87
from bzrlib import _walkdirs_win32
90
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
91
_native_to_unicode=_already_unicode)))
97
def load_tests(basic_tests, module, loader):
98
suite = loader.suiteClass()
99
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
100
basic_tests, tests.condition_isinstance(TestDirReader))
101
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
102
suite.addTest(remaining_tests)
106
class TestContainsWhitespace(tests.TestCase):
108
def test_contains_whitespace(self):
109
self.failUnless(osutils.contains_whitespace(u' '))
110
self.failUnless(osutils.contains_whitespace(u'hello there'))
111
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
112
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
113
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
114
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
116
# \xa0 is "Non-breaking-space" which on some python locales thinks it
117
# is whitespace, but we do not.
118
self.failIf(osutils.contains_whitespace(u''))
119
self.failIf(osutils.contains_whitespace(u'hellothere'))
120
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
123
class TestRename(tests.TestCaseInTempDir):
125
def create_file(self, filename, content):
126
f = open(filename, 'wb')
132
def _fancy_rename(self, a, b):
133
osutils.fancy_rename(a, b, rename_func=os.rename,
134
unlink_func=os.unlink)
136
def test_fancy_rename(self):
137
# This should work everywhere
138
self.create_file('a', 'something in a\n')
139
self._fancy_rename('a', 'b')
140
self.failIfExists('a')
141
self.failUnlessExists('b')
142
self.check_file_contents('b', 'something in a\n')
144
self.create_file('a', 'new something in a\n')
145
self._fancy_rename('b', 'a')
147
self.check_file_contents('a', 'something in a\n')
149
def test_fancy_rename_fails_source_missing(self):
150
# An exception should be raised, and the target should be left in place
151
self.create_file('target', 'data in target\n')
152
self.assertRaises((IOError, OSError), self._fancy_rename,
153
'missingsource', 'target')
154
self.failUnlessExists('target')
155
self.check_file_contents('target', 'data in target\n')
157
def test_fancy_rename_fails_if_source_and_target_missing(self):
158
self.assertRaises((IOError, OSError), self._fancy_rename,
159
'missingsource', 'missingtarget')
161
def test_rename(self):
162
# Rename should be semi-atomic on all platforms
163
self.create_file('a', 'something in a\n')
164
osutils.rename('a', 'b')
165
self.failIfExists('a')
166
self.failUnlessExists('b')
167
self.check_file_contents('b', 'something in a\n')
169
self.create_file('a', 'new something in a\n')
170
osutils.rename('b', 'a')
172
self.check_file_contents('a', 'something in a\n')
174
# TODO: test fancy_rename using a MemoryTransport
176
def test_rename_change_case(self):
177
# on Windows we should be able to change filename case by rename
178
self.build_tree(['a', 'b/'])
179
osutils.rename('a', 'A')
180
osutils.rename('b', 'B')
181
# we can't use failUnlessExists on case-insensitive filesystem
182
# so try to check shape of the tree
183
shape = sorted(os.listdir('.'))
184
self.assertEquals(['A', 'B'], shape)
187
class TestRandChars(tests.TestCase):
189
def test_01_rand_chars_empty(self):
190
result = osutils.rand_chars(0)
191
self.assertEqual(result, '')
193
def test_02_rand_chars_100(self):
194
result = osutils.rand_chars(100)
195
self.assertEqual(len(result), 100)
196
self.assertEqual(type(result), str)
197
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
200
class TestIsInside(tests.TestCase):
202
def test_is_inside(self):
203
is_inside = osutils.is_inside
204
self.assertTrue(is_inside('src', 'src/foo.c'))
205
self.assertFalse(is_inside('src', 'srccontrol'))
206
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
207
self.assertTrue(is_inside('foo.c', 'foo.c'))
208
self.assertFalse(is_inside('foo.c', ''))
209
self.assertTrue(is_inside('', 'foo.c'))
211
def test_is_inside_any(self):
212
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
213
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
214
(['src'], SRC_FOO_C),
217
self.assert_(osutils.is_inside_any(dirs, fn))
218
for dirs, fn in [(['src'], 'srccontrol'),
219
(['src'], 'srccontrol/foo')]:
220
self.assertFalse(osutils.is_inside_any(dirs, fn))
222
def test_is_inside_or_parent_of_any(self):
223
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
224
(['src'], 'src/foo.c'),
225
(['src/bar.c'], 'src'),
226
(['src/bar.c', 'bla/foo.c'], 'src'),
229
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
231
for dirs, fn in [(['src'], 'srccontrol'),
232
(['srccontrol/foo.c'], 'src'),
233
(['src'], 'srccontrol/foo')]:
234
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
237
class TestRmTree(tests.TestCaseInTempDir):
239
def test_rmtree(self):
240
# Check to remove tree with read-only files/dirs
242
f = file('dir/file', 'w')
245
# would like to also try making the directory readonly, but at the
246
# moment python shutil.rmtree doesn't handle that properly - it would
247
# need to chmod the directory before removing things inside it - deferred
248
# for now -- mbp 20060505
249
# osutils.make_readonly('dir')
250
osutils.make_readonly('dir/file')
252
osutils.rmtree('dir')
254
self.failIfExists('dir/file')
255
self.failIfExists('dir')
258
class TestDeleteAny(tests.TestCaseInTempDir):
260
def test_delete_any_readonly(self):
261
# from <https://bugs.launchpad.net/bzr/+bug/218206>
262
self.build_tree(['d/', 'f'])
263
osutils.make_readonly('d')
264
osutils.make_readonly('f')
266
osutils.delete_any('f')
267
osutils.delete_any('d')
270
class TestKind(tests.TestCaseInTempDir):
272
def test_file_kind(self):
273
self.build_tree(['file', 'dir/'])
274
self.assertEquals('file', osutils.file_kind('file'))
275
self.assertEquals('directory', osutils.file_kind('dir/'))
276
if osutils.has_symlinks():
277
os.symlink('symlink', 'symlink')
278
self.assertEquals('symlink', osutils.file_kind('symlink'))
280
# TODO: jam 20060529 Test a block device
282
os.lstat('/dev/null')
284
if e.errno not in (errno.ENOENT,):
287
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
289
mkfifo = getattr(os, 'mkfifo', None)
293
self.assertEquals('fifo', osutils.file_kind('fifo'))
297
AF_UNIX = getattr(socket, 'AF_UNIX', None)
299
s = socket.socket(AF_UNIX)
302
self.assertEquals('socket', osutils.file_kind('socket'))
306
def test_kind_marker(self):
307
self.assertEqual("", osutils.kind_marker("file"))
308
self.assertEqual("/", osutils.kind_marker('directory'))
309
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
310
self.assertEqual("@", osutils.kind_marker("symlink"))
311
self.assertEqual("+", osutils.kind_marker("tree-reference"))
312
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
315
class TestUmask(tests.TestCaseInTempDir):
317
def test_get_umask(self):
318
if sys.platform == 'win32':
319
# umask always returns '0', no way to set it
320
self.assertEqual(0, osutils.get_umask())
323
orig_umask = osutils.get_umask()
324
self.addCleanup(os.umask, orig_umask)
326
self.assertEqual(0222, osutils.get_umask())
328
self.assertEqual(0022, osutils.get_umask())
330
self.assertEqual(0002, osutils.get_umask())
332
self.assertEqual(0027, osutils.get_umask())
335
class TestDateTime(tests.TestCase):
337
def assertFormatedDelta(self, expected, seconds):
338
"""Assert osutils.format_delta formats as expected"""
339
actual = osutils.format_delta(seconds)
340
self.assertEqual(expected, actual)
342
def test_format_delta(self):
343
self.assertFormatedDelta('0 seconds ago', 0)
344
self.assertFormatedDelta('1 second ago', 1)
345
self.assertFormatedDelta('10 seconds ago', 10)
346
self.assertFormatedDelta('59 seconds ago', 59)
347
self.assertFormatedDelta('89 seconds ago', 89)
348
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
349
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
350
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
351
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
352
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
353
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
354
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
355
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
356
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
357
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
358
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
359
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
360
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
361
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
362
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
363
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
364
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
365
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
367
# We handle when time steps the wrong direction because computers
368
# don't have synchronized clocks.
369
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
370
self.assertFormatedDelta('1 second in the future', -1)
371
self.assertFormatedDelta('2 seconds in the future', -2)
373
def test_format_date(self):
374
self.assertRaises(errors.UnsupportedTimezoneFormat,
375
osutils.format_date, 0, timezone='foo')
376
self.assertIsInstance(osutils.format_date(0), str)
377
self.assertIsInstance(osutils.format_local_date(0), unicode)
378
# Testing for the actual value of the local weekday without
379
# duplicating the code from format_date is difficult.
380
# Instead blackbox.test_locale should check for localized
381
# dates once they do occur in output strings.
383
def test_format_date_with_offset_in_original_timezone(self):
384
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
385
osutils.format_date_with_offset_in_original_timezone(0))
386
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
387
osutils.format_date_with_offset_in_original_timezone(100000))
388
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
389
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
391
def test_local_time_offset(self):
392
"""Test that local_time_offset() returns a sane value."""
393
offset = osutils.local_time_offset()
394
self.assertTrue(isinstance(offset, int))
395
# Test that the offset is no more than a eighteen hours in
397
# Time zone handling is system specific, so it is difficult to
398
# do more specific tests, but a value outside of this range is
400
eighteen_hours = 18 * 3600
401
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
403
def test_local_time_offset_with_timestamp(self):
404
"""Test that local_time_offset() works with a timestamp."""
405
offset = osutils.local_time_offset(1000000000.1234567)
406
self.assertTrue(isinstance(offset, int))
407
eighteen_hours = 18 * 3600
408
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
411
class TestLinks(tests.TestCaseInTempDir):
413
def test_dereference_path(self):
414
self.requireFeature(tests.SymlinkFeature)
415
cwd = osutils.realpath('.')
417
bar_path = osutils.pathjoin(cwd, 'bar')
418
# Using './' to avoid bug #1213894 (first path component not
419
# dereferenced) in Python 2.4.1 and earlier
420
self.assertEqual(bar_path, osutils.realpath('./bar'))
421
os.symlink('bar', 'foo')
422
self.assertEqual(bar_path, osutils.realpath('./foo'))
424
# Does not dereference terminal symlinks
425
foo_path = osutils.pathjoin(cwd, 'foo')
426
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
428
# Dereferences parent symlinks
430
baz_path = osutils.pathjoin(bar_path, 'baz')
431
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
433
# Dereferences parent symlinks that are the first path element
434
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
436
# Dereferences parent symlinks in absolute paths
437
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
438
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
440
def test_changing_access(self):
441
f = file('file', 'w')
445
# Make a file readonly
446
osutils.make_readonly('file')
447
mode = os.lstat('file').st_mode
448
self.assertEqual(mode, mode & 0777555)
450
# Make a file writable
451
osutils.make_writable('file')
452
mode = os.lstat('file').st_mode
453
self.assertEqual(mode, mode | 0200)
455
if osutils.has_symlinks():
456
# should not error when handed a symlink
457
os.symlink('nonexistent', 'dangling')
458
osutils.make_readonly('dangling')
459
osutils.make_writable('dangling')
461
def test_host_os_dereferences_symlinks(self):
462
osutils.host_os_dereferences_symlinks()
465
class TestCanonicalRelPath(tests.TestCaseInTempDir):
467
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
469
def test_canonical_relpath_simple(self):
470
f = file('MixedCaseName', 'w')
472
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
473
self.failUnlessEqual('work/MixedCaseName', actual)
475
def test_canonical_relpath_missing_tail(self):
476
os.mkdir('MixedCaseParent')
477
actual = osutils.canonical_relpath(self.test_base_dir,
478
'mixedcaseparent/nochild')
479
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
482
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
484
def assertRelpath(self, expected, base, path):
485
actual = osutils._cicp_canonical_relpath(base, path)
486
self.assertEqual(expected, actual)
488
def test_simple(self):
489
self.build_tree(['MixedCaseName'])
490
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
491
self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
493
def test_subdir_missing_tail(self):
494
self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
495
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
496
self.assertRelpath('MixedCaseParent/a_child', base,
497
'MixedCaseParent/a_child')
498
self.assertRelpath('MixedCaseParent/a_child', base,
499
'MixedCaseParent/A_Child')
500
self.assertRelpath('MixedCaseParent/not_child', base,
501
'MixedCaseParent/not_child')
503
def test_at_root_slash(self):
504
# We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
506
if osutils.MIN_ABS_PATHLENGTH > 1:
507
raise tests.TestSkipped('relpath requires %d chars'
508
% osutils.MIN_ABS_PATHLENGTH)
509
self.assertRelpath('foo', '/', '/foo')
511
def test_at_root_drive(self):
512
if sys.platform != 'win32':
513
raise tests.TestNotApplicable('we can only test drive-letter relative'
514
' paths on Windows where we have drive'
517
# The specific issue is that when at the root of a drive, 'abspath'
518
# returns "C:/" or just "/". However, the code assumes that abspath
519
# always returns something like "C:/foo" or "/foo" (no trailing slash).
520
self.assertRelpath('foo', 'C:/', 'C:/foo')
521
self.assertRelpath('foo', 'X:/', 'X:/foo')
522
self.assertRelpath('foo', 'X:/', 'X://foo')
525
class TestPumpFile(tests.TestCase):
526
"""Test pumpfile method."""
529
tests.TestCase.setUp(self)
530
# create a test datablock
531
self.block_size = 512
532
pattern = '0123456789ABCDEF'
533
self.test_data = pattern * (3 * self.block_size / len(pattern))
534
self.test_data_len = len(self.test_data)
536
def test_bracket_block_size(self):
537
"""Read data in blocks with the requested read size bracketing the
539
# make sure test data is larger than max read size
540
self.assertTrue(self.test_data_len > self.block_size)
542
from_file = file_utils.FakeReadFile(self.test_data)
545
# read (max / 2) bytes and verify read size wasn't affected
546
num_bytes_to_read = self.block_size / 2
547
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
548
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
549
self.assertEqual(from_file.get_read_count(), 1)
551
# read (max) bytes and verify read size wasn't affected
552
num_bytes_to_read = self.block_size
553
from_file.reset_read_count()
554
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
555
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
556
self.assertEqual(from_file.get_read_count(), 1)
558
# read (max + 1) bytes and verify read size was limited
559
num_bytes_to_read = self.block_size + 1
560
from_file.reset_read_count()
561
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
562
self.assertEqual(from_file.get_max_read_size(), self.block_size)
563
self.assertEqual(from_file.get_read_count(), 2)
565
# finish reading the rest of the data
566
num_bytes_to_read = self.test_data_len - to_file.tell()
567
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
569
# report error if the data wasn't equal (we only report the size due
570
# to the length of the data)
571
response_data = to_file.getvalue()
572
if response_data != self.test_data:
573
message = "Data not equal. Expected %d bytes, received %d."
574
self.fail(message % (len(response_data), self.test_data_len))
576
def test_specified_size(self):
577
"""Request a transfer larger than the maximum block size and verify
578
that the maximum read doesn't exceed the block_size."""
579
# make sure test data is larger than max read size
580
self.assertTrue(self.test_data_len > self.block_size)
582
# retrieve data in blocks
583
from_file = file_utils.FakeReadFile(self.test_data)
585
osutils.pumpfile(from_file, to_file, self.test_data_len,
588
# verify read size was equal to the maximum read size
589
self.assertTrue(from_file.get_max_read_size() > 0)
590
self.assertEqual(from_file.get_max_read_size(), self.block_size)
591
self.assertEqual(from_file.get_read_count(), 3)
593
# report error if the data wasn't equal (we only report the size due
594
# to the length of the data)
595
response_data = to_file.getvalue()
596
if response_data != self.test_data:
597
message = "Data not equal. Expected %d bytes, received %d."
598
self.fail(message % (len(response_data), self.test_data_len))
600
def test_to_eof(self):
601
"""Read to end-of-file and verify that the reads are not larger than
602
the maximum read size."""
603
# make sure test data is larger than max read size
604
self.assertTrue(self.test_data_len > self.block_size)
606
# retrieve data to EOF
607
from_file = file_utils.FakeReadFile(self.test_data)
609
osutils.pumpfile(from_file, to_file, -1, self.block_size)
611
# verify read size was equal to the maximum read size
612
self.assertEqual(from_file.get_max_read_size(), self.block_size)
613
self.assertEqual(from_file.get_read_count(), 4)
615
# report error if the data wasn't equal (we only report the size due
616
# to the length of the data)
617
response_data = to_file.getvalue()
618
if response_data != self.test_data:
619
message = "Data not equal. Expected %d bytes, received %d."
620
self.fail(message % (len(response_data), self.test_data_len))
622
def test_defaults(self):
623
"""Verifies that the default arguments will read to EOF -- this
624
test verifies that any existing usages of pumpfile will not be broken
625
with this new version."""
626
# retrieve data using default (old) pumpfile method
627
from_file = file_utils.FakeReadFile(self.test_data)
629
osutils.pumpfile(from_file, to_file)
631
# report error if the data wasn't equal (we only report the size due
632
# to the length of the data)
633
response_data = to_file.getvalue()
634
if response_data != self.test_data:
635
message = "Data not equal. Expected %d bytes, received %d."
636
self.fail(message % (len(response_data), self.test_data_len))
638
def test_report_activity(self):
640
def log_activity(length, direction):
641
activity.append((length, direction))
642
from_file = StringIO(self.test_data)
644
osutils.pumpfile(from_file, to_file, buff_size=500,
645
report_activity=log_activity, direction='read')
646
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
647
(36, 'read')], activity)
649
from_file = StringIO(self.test_data)
652
osutils.pumpfile(from_file, to_file, buff_size=500,
653
report_activity=log_activity, direction='write')
654
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
655
(36, 'write')], activity)
657
# And with a limited amount of data
658
from_file = StringIO(self.test_data)
661
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
662
report_activity=log_activity, direction='read')
663
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
667
class TestPumpStringFile(tests.TestCase):
669
def test_empty(self):
671
osutils.pump_string_file("", output)
672
self.assertEqual("", output.getvalue())
674
def test_more_than_segment_size(self):
676
osutils.pump_string_file("123456789", output, 2)
677
self.assertEqual("123456789", output.getvalue())
679
def test_segment_size(self):
681
osutils.pump_string_file("12", output, 2)
682
self.assertEqual("12", output.getvalue())
684
def test_segment_size_multiple(self):
686
osutils.pump_string_file("1234", output, 2)
687
self.assertEqual("1234", output.getvalue())
690
class TestRelpath(tests.TestCase):
692
def test_simple_relpath(self):
693
cwd = osutils.getcwd()
694
subdir = cwd + '/subdir'
695
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
697
def test_deep_relpath(self):
698
cwd = osutils.getcwd()
699
subdir = cwd + '/sub/subsubdir'
700
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
702
def test_not_relative(self):
703
self.assertRaises(errors.PathNotChild,
704
osutils.relpath, 'C:/path', 'H:/path')
705
self.assertRaises(errors.PathNotChild,
706
osutils.relpath, 'C:/', 'H:/path')
709
class TestSafeUnicode(tests.TestCase):
711
def test_from_ascii_string(self):
712
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
714
def test_from_unicode_string_ascii_contents(self):
715
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
717
def test_from_unicode_string_unicode_contents(self):
718
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
720
def test_from_utf8_string(self):
721
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
723
def test_bad_utf8_string(self):
724
self.assertRaises(errors.BzrBadParameterNotUnicode,
725
osutils.safe_unicode,
729
class TestSafeUtf8(tests.TestCase):
731
def test_from_ascii_string(self):
733
self.assertEqual('foobar', osutils.safe_utf8(f))
735
def test_from_unicode_string_ascii_contents(self):
736
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
738
def test_from_unicode_string_unicode_contents(self):
739
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
741
def test_from_utf8_string(self):
742
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
744
def test_bad_utf8_string(self):
745
self.assertRaises(errors.BzrBadParameterNotUnicode,
746
osutils.safe_utf8, '\xbb\xbb')
749
class TestSafeRevisionId(tests.TestCase):
751
def test_from_ascii_string(self):
752
# this shouldn't give a warning because it's getting an ascii string
753
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
755
def test_from_unicode_string_ascii_contents(self):
756
self.assertEqual('bargam',
757
osutils.safe_revision_id(u'bargam', warn=False))
759
def test_from_unicode_deprecated(self):
760
self.assertEqual('bargam',
761
self.callDeprecated([osutils._revision_id_warning],
762
osutils.safe_revision_id, u'bargam'))
764
def test_from_unicode_string_unicode_contents(self):
765
self.assertEqual('bargam\xc2\xae',
766
osutils.safe_revision_id(u'bargam\xae', warn=False))
768
def test_from_utf8_string(self):
769
self.assertEqual('foo\xc2\xae',
770
osutils.safe_revision_id('foo\xc2\xae'))
773
"""Currently, None is a valid revision_id"""
774
self.assertEqual(None, osutils.safe_revision_id(None))
777
class TestSafeFileId(tests.TestCase):
779
def test_from_ascii_string(self):
780
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
782
def test_from_unicode_string_ascii_contents(self):
783
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
785
def test_from_unicode_deprecated(self):
786
self.assertEqual('bargam',
787
self.callDeprecated([osutils._file_id_warning],
788
osutils.safe_file_id, u'bargam'))
790
def test_from_unicode_string_unicode_contents(self):
791
self.assertEqual('bargam\xc2\xae',
792
osutils.safe_file_id(u'bargam\xae', warn=False))
794
def test_from_utf8_string(self):
795
self.assertEqual('foo\xc2\xae',
796
osutils.safe_file_id('foo\xc2\xae'))
799
"""Currently, None is a valid revision_id"""
800
self.assertEqual(None, osutils.safe_file_id(None))
803
class TestWin32Funcs(tests.TestCase):
804
"""Test that _win32 versions of os utilities return appropriate paths."""
806
def test_abspath(self):
807
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
808
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
809
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
810
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
812
def test_realpath(self):
813
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
814
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
816
def test_pathjoin(self):
817
self.assertEqual('path/to/foo',
818
osutils._win32_pathjoin('path', 'to', 'foo'))
819
self.assertEqual('C:/foo',
820
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
821
self.assertEqual('C:/foo',
822
osutils._win32_pathjoin('path/to', 'C:/foo'))
823
self.assertEqual('path/to/foo',
824
osutils._win32_pathjoin('path/to/', 'foo'))
825
self.assertEqual('/foo',
826
osutils._win32_pathjoin('C:/path/to/', '/foo'))
827
self.assertEqual('/foo',
828
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
830
def test_normpath(self):
831
self.assertEqual('path/to/foo',
832
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
833
self.assertEqual('path/to/foo',
834
osutils._win32_normpath('path//from/../to/./foo'))
836
def test_getcwd(self):
837
cwd = osutils._win32_getcwd()
838
os_cwd = os.getcwdu()
839
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
840
# win32 is inconsistent whether it returns lower or upper case
841
# and even if it was consistent the user might type the other
842
# so we force it to uppercase
843
# running python.exe under cmd.exe return capital C:\\
844
# running win32 python inside a cygwin shell returns lowercase
845
self.assertEqual(os_cwd[0].upper(), cwd[0])
847
def test_fixdrive(self):
848
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
849
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
850
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
852
def test_win98_abspath(self):
854
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
855
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
857
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
858
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
860
cwd = osutils.getcwd().rstrip('/')
861
drive = osutils._nt_splitdrive(cwd)[0]
862
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
863
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
866
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
869
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
870
"""Test win32 functions that create files."""
872
def test_getcwd(self):
873
self.requireFeature(tests.UnicodeFilenameFeature)
876
# TODO: jam 20060427 This will probably fail on Mac OSX because
877
# it will change the normalization of B\xe5gfors
878
# Consider using a different unicode character, or make
879
# osutils.getcwd() renormalize the path.
880
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
882
def test_minimum_path_selection(self):
883
self.assertEqual(set(),
884
osutils.minimum_path_selection([]))
885
self.assertEqual(set(['a']),
886
osutils.minimum_path_selection(['a']))
887
self.assertEqual(set(['a', 'b']),
888
osutils.minimum_path_selection(['a', 'b']))
889
self.assertEqual(set(['a/', 'b']),
890
osutils.minimum_path_selection(['a/', 'b']))
891
self.assertEqual(set(['a/', 'b']),
892
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
893
self.assertEqual(set(['a-b', 'a', 'a0b']),
894
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
896
def test_mkdtemp(self):
897
tmpdir = osutils._win32_mkdtemp(dir='.')
898
self.assertFalse('\\' in tmpdir)
900
def test_rename(self):
908
osutils._win32_rename('b', 'a')
909
self.failUnlessExists('a')
910
self.failIfExists('b')
911
self.assertFileEqual('baz\n', 'a')
913
def test_rename_missing_file(self):
919
osutils._win32_rename('b', 'a')
920
except (IOError, OSError), e:
921
self.assertEqual(errno.ENOENT, e.errno)
922
self.assertFileEqual('foo\n', 'a')
924
def test_rename_missing_dir(self):
927
osutils._win32_rename('b', 'a')
928
except (IOError, OSError), e:
929
self.assertEqual(errno.ENOENT, e.errno)
931
def test_rename_current_dir(self):
934
# You can't rename the working directory
935
# doing rename non-existant . usually
936
# just raises ENOENT, since non-existant
939
osutils._win32_rename('b', '.')
940
except (IOError, OSError), e:
941
self.assertEqual(errno.ENOENT, e.errno)
943
def test_splitpath(self):
944
def check(expected, path):
945
self.assertEqual(expected, osutils.splitpath(path))
948
check(['a', 'b'], 'a/b')
949
check(['a', 'b'], 'a/./b')
950
check(['a', '.b'], 'a/.b')
951
check(['a', '.b'], 'a\\.b')
953
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
956
class TestParentDirectories(tests.TestCaseInTempDir):
957
"""Test osutils.parent_directories()"""
959
def test_parent_directories(self):
960
self.assertEqual([], osutils.parent_directories('a'))
961
self.assertEqual(['a'], osutils.parent_directories('a/b'))
962
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
965
class TestMacFuncsDirs(tests.TestCaseInTempDir):
966
"""Test mac special functions that require directories."""
968
def test_getcwd(self):
969
self.requireFeature(tests.UnicodeFilenameFeature)
970
os.mkdir(u'B\xe5gfors')
971
os.chdir(u'B\xe5gfors')
972
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
974
def test_getcwd_nonnorm(self):
975
self.requireFeature(tests.UnicodeFilenameFeature)
976
# Test that _mac_getcwd() will normalize this path
977
os.mkdir(u'Ba\u030agfors')
978
os.chdir(u'Ba\u030agfors')
979
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
982
class TestChunksToLines(tests.TestCase):
984
def test_smoketest(self):
985
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
986
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
987
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
988
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
990
def test_osutils_binding(self):
991
from bzrlib.tests import test__chunks_to_lines
992
if test__chunks_to_lines.compiled_chunkstolines_feature.available():
993
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
995
from bzrlib._chunks_to_lines_py import chunks_to_lines
996
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
999
class TestSplitLines(tests.TestCase):
1001
def test_split_unicode(self):
1002
self.assertEqual([u'foo\n', u'bar\xae'],
1003
osutils.split_lines(u'foo\nbar\xae'))
1004
self.assertEqual([u'foo\n', u'bar\xae\n'],
1005
osutils.split_lines(u'foo\nbar\xae\n'))
1007
def test_split_with_carriage_returns(self):
1008
self.assertEqual(['foo\rbar\n'],
1009
osutils.split_lines('foo\rbar\n'))
1012
class TestWalkDirs(tests.TestCaseInTempDir):
1014
def assertExpectedBlocks(self, expected, result):
1015
self.assertEqual(expected,
1016
[(dirinfo, [line[0:3] for line in block])
1017
for dirinfo, block in result])
1019
def test_walkdirs(self):
1028
self.build_tree(tree)
1029
expected_dirblocks = [
1031
[('0file', '0file', 'file'),
1032
('1dir', '1dir', 'directory'),
1033
('2file', '2file', 'file'),
1036
(('1dir', './1dir'),
1037
[('1dir/0file', '0file', 'file'),
1038
('1dir/1dir', '1dir', 'directory'),
1041
(('1dir/1dir', './1dir/1dir'),
1047
found_bzrdir = False
1048
for dirdetail, dirblock in osutils.walkdirs('.'):
1049
if len(dirblock) and dirblock[0][1] == '.bzr':
1050
# this tests the filtering of selected paths
1053
result.append((dirdetail, dirblock))
1055
self.assertTrue(found_bzrdir)
1056
self.assertExpectedBlocks(expected_dirblocks, result)
1057
# you can search a subdir only, with a supplied prefix.
1059
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1060
result.append(dirblock)
1061
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1063
def test_walkdirs_os_error(self):
1064
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1065
# Pyrex readdir didn't raise useful messages if it had an error
1066
# reading the directory
1067
if sys.platform == 'win32':
1068
raise tests.TestNotApplicable(
1069
"readdir IOError not tested on win32")
1070
os.mkdir("test-unreadable")
1071
os.chmod("test-unreadable", 0000)
1072
# must chmod it back so that it can be removed
1073
self.addCleanup(os.chmod, "test-unreadable", 0700)
1074
# The error is not raised until the generator is actually evaluated.
1075
# (It would be ok if it happened earlier but at the moment it
1077
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1078
self.assertEquals('./test-unreadable', e.filename)
1079
self.assertEquals(errno.EACCES, e.errno)
1080
# Ensure the message contains the file name
1081
self.assertContainsRe(str(e), "\./test-unreadable")
1083
def test__walkdirs_utf8(self):
1092
self.build_tree(tree)
1093
expected_dirblocks = [
1095
[('0file', '0file', 'file'),
1096
('1dir', '1dir', 'directory'),
1097
('2file', '2file', 'file'),
1100
(('1dir', './1dir'),
1101
[('1dir/0file', '0file', 'file'),
1102
('1dir/1dir', '1dir', 'directory'),
1105
(('1dir/1dir', './1dir/1dir'),
1111
found_bzrdir = False
1112
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1113
if len(dirblock) and dirblock[0][1] == '.bzr':
1114
# this tests the filtering of selected paths
1117
result.append((dirdetail, dirblock))
1119
self.assertTrue(found_bzrdir)
1120
self.assertExpectedBlocks(expected_dirblocks, result)
1122
# you can search a subdir only, with a supplied prefix.
1124
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1125
result.append(dirblock)
1126
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1128
def _filter_out_stat(self, result):
1129
"""Filter out the stat value from the walkdirs result"""
1130
for dirdetail, dirblock in result:
1132
for info in dirblock:
1133
# Ignore info[3] which is the stat
1134
new_dirblock.append((info[0], info[1], info[2], info[4]))
1135
dirblock[:] = new_dirblock
1137
def _save_platform_info(self):
1138
self.overrideAttr(win32utils, 'winver')
1139
self.overrideAttr(osutils, '_fs_enc')
1140
self.overrideAttr(osutils, '_selected_dir_reader')
1142
def assertDirReaderIs(self, expected):
1143
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1144
# Force it to redetect
1145
osutils._selected_dir_reader = None
1146
# Nothing to list, but should still trigger the selection logic
1147
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1148
self.assertIsInstance(osutils._selected_dir_reader, expected)
1150
def test_force_walkdirs_utf8_fs_utf8(self):
1151
self.requireFeature(UTF8DirReaderFeature)
1152
self._save_platform_info()
1153
win32utils.winver = None # Avoid the win32 detection code
1154
osutils._fs_enc = 'UTF-8'
1155
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1157
def test_force_walkdirs_utf8_fs_ascii(self):
1158
self.requireFeature(UTF8DirReaderFeature)
1159
self._save_platform_info()
1160
win32utils.winver = None # Avoid the win32 detection code
1161
osutils._fs_enc = 'US-ASCII'
1162
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1164
def test_force_walkdirs_utf8_fs_ANSI(self):
1165
self.requireFeature(UTF8DirReaderFeature)
1166
self._save_platform_info()
1167
win32utils.winver = None # Avoid the win32 detection code
1168
osutils._fs_enc = 'ANSI_X3.4-1968'
1169
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1171
def test_force_walkdirs_utf8_fs_latin1(self):
1172
self._save_platform_info()
1173
win32utils.winver = None # Avoid the win32 detection code
1174
osutils._fs_enc = 'latin1'
1175
self.assertDirReaderIs(osutils.UnicodeDirReader)
1177
def test_force_walkdirs_utf8_nt(self):
1178
# Disabled because the thunk of the whole walkdirs api is disabled.
1179
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1180
self._save_platform_info()
1181
win32utils.winver = 'Windows NT'
1182
from bzrlib._walkdirs_win32 import Win32ReadDir
1183
self.assertDirReaderIs(Win32ReadDir)
1185
def test_force_walkdirs_utf8_98(self):
1186
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1187
self._save_platform_info()
1188
win32utils.winver = 'Windows 98'
1189
self.assertDirReaderIs(osutils.UnicodeDirReader)
1191
def test_unicode_walkdirs(self):
1192
"""Walkdirs should always return unicode paths."""
1193
self.requireFeature(tests.UnicodeFilenameFeature)
1194
name0 = u'0file-\xb6'
1195
name1 = u'1dir-\u062c\u0648'
1196
name2 = u'2file-\u0633'
1200
name1 + '/' + name0,
1201
name1 + '/' + name1 + '/',
1204
self.build_tree(tree)
1205
expected_dirblocks = [
1207
[(name0, name0, 'file', './' + name0),
1208
(name1, name1, 'directory', './' + name1),
1209
(name2, name2, 'file', './' + name2),
1212
((name1, './' + name1),
1213
[(name1 + '/' + name0, name0, 'file', './' + name1
1215
(name1 + '/' + name1, name1, 'directory', './' + name1
1219
((name1 + '/' + name1, './' + name1 + '/' + name1),
1224
result = list(osutils.walkdirs('.'))
1225
self._filter_out_stat(result)
1226
self.assertEqual(expected_dirblocks, result)
1227
result = list(osutils.walkdirs(u'./'+name1, name1))
1228
self._filter_out_stat(result)
1229
self.assertEqual(expected_dirblocks[1:], result)
1231
def test_unicode__walkdirs_utf8(self):
1232
"""Walkdirs_utf8 should always return utf8 paths.
1234
The abspath portion might be in unicode or utf-8
1236
self.requireFeature(tests.UnicodeFilenameFeature)
1237
name0 = u'0file-\xb6'
1238
name1 = u'1dir-\u062c\u0648'
1239
name2 = u'2file-\u0633'
1243
name1 + '/' + name0,
1244
name1 + '/' + name1 + '/',
1247
self.build_tree(tree)
1248
name0 = name0.encode('utf8')
1249
name1 = name1.encode('utf8')
1250
name2 = name2.encode('utf8')
1252
expected_dirblocks = [
1254
[(name0, name0, 'file', './' + name0),
1255
(name1, name1, 'directory', './' + name1),
1256
(name2, name2, 'file', './' + name2),
1259
((name1, './' + name1),
1260
[(name1 + '/' + name0, name0, 'file', './' + name1
1262
(name1 + '/' + name1, name1, 'directory', './' + name1
1266
((name1 + '/' + name1, './' + name1 + '/' + name1),
1272
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1273
# all abspaths are Unicode, and encode them back into utf8.
1274
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1275
self.assertIsInstance(dirdetail[0], str)
1276
if isinstance(dirdetail[1], unicode):
1277
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1278
dirblock = [list(info) for info in dirblock]
1279
for info in dirblock:
1280
self.assertIsInstance(info[4], unicode)
1281
info[4] = info[4].encode('utf8')
1283
for info in dirblock:
1284
self.assertIsInstance(info[0], str)
1285
self.assertIsInstance(info[1], str)
1286
self.assertIsInstance(info[4], str)
1287
# Remove the stat information
1288
new_dirblock.append((info[0], info[1], info[2], info[4]))
1289
result.append((dirdetail, new_dirblock))
1290
self.assertEqual(expected_dirblocks, result)
1292
def test__walkdirs_utf8_with_unicode_fs(self):
1293
"""UnicodeDirReader should be a safe fallback everywhere
1295
The abspath portion should be in unicode
1297
self.requireFeature(tests.UnicodeFilenameFeature)
1298
# Use the unicode reader. TODO: split into driver-and-driven unit
1300
self._save_platform_info()
1301
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1302
name0u = u'0file-\xb6'
1303
name1u = u'1dir-\u062c\u0648'
1304
name2u = u'2file-\u0633'
1308
name1u + '/' + name0u,
1309
name1u + '/' + name1u + '/',
1312
self.build_tree(tree)
1313
name0 = name0u.encode('utf8')
1314
name1 = name1u.encode('utf8')
1315
name2 = name2u.encode('utf8')
1317
# All of the abspaths should be in unicode, all of the relative paths
1319
expected_dirblocks = [
1321
[(name0, name0, 'file', './' + name0u),
1322
(name1, name1, 'directory', './' + name1u),
1323
(name2, name2, 'file', './' + name2u),
1326
((name1, './' + name1u),
1327
[(name1 + '/' + name0, name0, 'file', './' + name1u
1329
(name1 + '/' + name1, name1, 'directory', './' + name1u
1333
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1338
result = list(osutils._walkdirs_utf8('.'))
1339
self._filter_out_stat(result)
1340
self.assertEqual(expected_dirblocks, result)
1342
def test__walkdirs_utf8_win32readdir(self):
1343
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1344
self.requireFeature(tests.UnicodeFilenameFeature)
1345
from bzrlib._walkdirs_win32 import Win32ReadDir
1346
self._save_platform_info()
1347
osutils._selected_dir_reader = Win32ReadDir()
1348
name0u = u'0file-\xb6'
1349
name1u = u'1dir-\u062c\u0648'
1350
name2u = u'2file-\u0633'
1354
name1u + '/' + name0u,
1355
name1u + '/' + name1u + '/',
1358
self.build_tree(tree)
1359
name0 = name0u.encode('utf8')
1360
name1 = name1u.encode('utf8')
1361
name2 = name2u.encode('utf8')
1363
# All of the abspaths should be in unicode, all of the relative paths
1365
expected_dirblocks = [
1367
[(name0, name0, 'file', './' + name0u),
1368
(name1, name1, 'directory', './' + name1u),
1369
(name2, name2, 'file', './' + name2u),
1372
((name1, './' + name1u),
1373
[(name1 + '/' + name0, name0, 'file', './' + name1u
1375
(name1 + '/' + name1, name1, 'directory', './' + name1u
1379
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1384
result = list(osutils._walkdirs_utf8(u'.'))
1385
self._filter_out_stat(result)
1386
self.assertEqual(expected_dirblocks, result)
1388
def assertStatIsCorrect(self, path, win32stat):
1389
os_stat = os.stat(path)
1390
self.assertEqual(os_stat.st_size, win32stat.st_size)
1391
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1392
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1393
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1394
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1395
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1396
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1398
def test__walkdirs_utf_win32_find_file_stat_file(self):
1399
"""make sure our Stat values are valid"""
1400
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1401
self.requireFeature(tests.UnicodeFilenameFeature)
1402
from bzrlib._walkdirs_win32 import Win32ReadDir
1403
name0u = u'0file-\xb6'
1404
name0 = name0u.encode('utf8')
1405
self.build_tree([name0u])
1406
# I hate to sleep() here, but I'm trying to make the ctime different
1409
f = open(name0u, 'ab')
1411
f.write('just a small update')
1415
result = Win32ReadDir().read_dir('', u'.')
1417
self.assertEqual((name0, name0, 'file'), entry[:3])
1418
self.assertEqual(u'./' + name0u, entry[4])
1419
self.assertStatIsCorrect(entry[4], entry[3])
1420
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1422
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1423
"""make sure our Stat values are valid"""
1424
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1425
self.requireFeature(tests.UnicodeFilenameFeature)
1426
from bzrlib._walkdirs_win32 import Win32ReadDir
1427
name0u = u'0dir-\u062c\u0648'
1428
name0 = name0u.encode('utf8')
1429
self.build_tree([name0u + '/'])
1431
result = Win32ReadDir().read_dir('', u'.')
1433
self.assertEqual((name0, name0, 'directory'), entry[:3])
1434
self.assertEqual(u'./' + name0u, entry[4])
1435
self.assertStatIsCorrect(entry[4], entry[3])
1437
def assertPathCompare(self, path_less, path_greater):
1438
"""check that path_less and path_greater compare correctly."""
1439
self.assertEqual(0, osutils.compare_paths_prefix_order(
1440
path_less, path_less))
1441
self.assertEqual(0, osutils.compare_paths_prefix_order(
1442
path_greater, path_greater))
1443
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1444
path_less, path_greater))
1445
self.assertEqual(1, osutils.compare_paths_prefix_order(
1446
path_greater, path_less))
1448
def test_compare_paths_prefix_order(self):
1449
# root before all else
1450
self.assertPathCompare("/", "/a")
1451
# alpha within a dir
1452
self.assertPathCompare("/a", "/b")
1453
self.assertPathCompare("/b", "/z")
1454
# high dirs before lower.
1455
self.assertPathCompare("/z", "/a/a")
1456
# except if the deeper dir should be output first
1457
self.assertPathCompare("/a/b/c", "/d/g")
1458
# lexical betwen dirs of the same height
1459
self.assertPathCompare("/a/z", "/z/z")
1460
self.assertPathCompare("/a/c/z", "/a/d/e")
1462
# this should also be consistent for no leading / paths
1463
# root before all else
1464
self.assertPathCompare("", "a")
1465
# alpha within a dir
1466
self.assertPathCompare("a", "b")
1467
self.assertPathCompare("b", "z")
1468
# high dirs before lower.
1469
self.assertPathCompare("z", "a/a")
1470
# except if the deeper dir should be output first
1471
self.assertPathCompare("a/b/c", "d/g")
1472
# lexical betwen dirs of the same height
1473
self.assertPathCompare("a/z", "z/z")
1474
self.assertPathCompare("a/c/z", "a/d/e")
1476
def test_path_prefix_sorting(self):
1477
"""Doing a sort on path prefix should match our sample data."""
1492
dir_sorted_paths = [
1508
sorted(original_paths, key=osutils.path_prefix_key))
1509
# using the comparison routine shoudl work too:
1512
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1515
class TestCopyTree(tests.TestCaseInTempDir):
1517
def test_copy_basic_tree(self):
1518
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1519
osutils.copy_tree('source', 'target')
1520
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1521
self.assertEqual(['c'], os.listdir('target/b'))
1523
def test_copy_tree_target_exists(self):
1524
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1526
osutils.copy_tree('source', 'target')
1527
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1528
self.assertEqual(['c'], os.listdir('target/b'))
1530
def test_copy_tree_symlinks(self):
1531
self.requireFeature(tests.SymlinkFeature)
1532
self.build_tree(['source/'])
1533
os.symlink('a/generic/path', 'source/lnk')
1534
osutils.copy_tree('source', 'target')
1535
self.assertEqual(['lnk'], os.listdir('target'))
1536
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1538
def test_copy_tree_handlers(self):
1539
processed_files = []
1540
processed_links = []
1541
def file_handler(from_path, to_path):
1542
processed_files.append(('f', from_path, to_path))
1543
def dir_handler(from_path, to_path):
1544
processed_files.append(('d', from_path, to_path))
1545
def link_handler(from_path, to_path):
1546
processed_links.append((from_path, to_path))
1547
handlers = {'file':file_handler,
1548
'directory':dir_handler,
1549
'symlink':link_handler,
1552
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1553
if osutils.has_symlinks():
1554
os.symlink('a/generic/path', 'source/lnk')
1555
osutils.copy_tree('source', 'target', handlers=handlers)
1557
self.assertEqual([('d', 'source', 'target'),
1558
('f', 'source/a', 'target/a'),
1559
('d', 'source/b', 'target/b'),
1560
('f', 'source/b/c', 'target/b/c'),
1562
self.failIfExists('target')
1563
if osutils.has_symlinks():
1564
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1567
class TestSetUnsetEnv(tests.TestCase):
1568
"""Test updating the environment"""
1571
super(TestSetUnsetEnv, self).setUp()
1573
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1574
'Environment was not cleaned up properly.'
1575
' Variable BZR_TEST_ENV_VAR should not exist.')
1577
if 'BZR_TEST_ENV_VAR' in os.environ:
1578
del os.environ['BZR_TEST_ENV_VAR']
1579
self.addCleanup(cleanup)
1582
"""Test that we can set an env variable"""
1583
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1584
self.assertEqual(None, old)
1585
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1587
def test_double_set(self):
1588
"""Test that we get the old value out"""
1589
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1590
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1591
self.assertEqual('foo', old)
1592
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1594
def test_unicode(self):
1595
"""Environment can only contain plain strings
1597
So Unicode strings must be encoded.
1599
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1601
raise tests.TestSkipped(
1602
'Cannot find a unicode character that works in encoding %s'
1603
% (osutils.get_user_encoding(),))
1605
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1606
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1608
def test_unset(self):
1609
"""Test that passing None will remove the env var"""
1610
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1611
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1612
self.assertEqual('foo', old)
1613
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1614
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1617
class TestSizeShaFile(tests.TestCaseInTempDir):
1619
def test_sha_empty(self):
1620
self.build_tree_contents([('foo', '')])
1621
expected_sha = osutils.sha_string('')
1623
self.addCleanup(f.close)
1624
size, sha = osutils.size_sha_file(f)
1625
self.assertEqual(0, size)
1626
self.assertEqual(expected_sha, sha)
1628
def test_sha_mixed_endings(self):
1629
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1630
self.build_tree_contents([('foo', text)])
1631
expected_sha = osutils.sha_string(text)
1632
f = open('foo', 'rb')
1633
self.addCleanup(f.close)
1634
size, sha = osutils.size_sha_file(f)
1635
self.assertEqual(38, size)
1636
self.assertEqual(expected_sha, sha)
1639
class TestShaFileByName(tests.TestCaseInTempDir):
1641
def test_sha_empty(self):
1642
self.build_tree_contents([('foo', '')])
1643
expected_sha = osutils.sha_string('')
1644
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1646
def test_sha_mixed_endings(self):
1647
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1648
self.build_tree_contents([('foo', text)])
1649
expected_sha = osutils.sha_string(text)
1650
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1653
class TestResourceLoading(tests.TestCaseInTempDir):
1655
def test_resource_string(self):
1656
# test resource in bzrlib
1657
text = osutils.resource_string('bzrlib', 'debug.py')
1658
self.assertContainsRe(text, "debug_flags = set()")
1659
# test resource under bzrlib
1660
text = osutils.resource_string('bzrlib.ui', 'text.py')
1661
self.assertContainsRe(text, "class TextUIFactory")
1662
# test unsupported package
1663
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1665
# test unknown resource
1666
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1669
class TestReCompile(tests.TestCase):
1671
def test_re_compile_checked(self):
1672
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1673
self.assertTrue(r.match('aaaa'))
1674
self.assertTrue(r.match('aAaA'))
1676
def test_re_compile_checked_error(self):
1677
# like https://bugs.launchpad.net/bzr/+bug/251352
1678
err = self.assertRaises(
1679
errors.BzrCommandError,
1680
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1682
"Invalid regular expression in test case: '*': "
1683
"nothing to repeat",
1687
class TestDirReader(tests.TestCaseInTempDir):
1690
_dir_reader_class = None
1691
_native_to_unicode = None
1694
tests.TestCaseInTempDir.setUp(self)
1695
self.overrideAttr(osutils,
1696
'_selected_dir_reader', self._dir_reader_class())
1698
def _get_ascii_tree(self):
1706
expected_dirblocks = [
1708
[('0file', '0file', 'file'),
1709
('1dir', '1dir', 'directory'),
1710
('2file', '2file', 'file'),
1713
(('1dir', './1dir'),
1714
[('1dir/0file', '0file', 'file'),
1715
('1dir/1dir', '1dir', 'directory'),
1718
(('1dir/1dir', './1dir/1dir'),
1723
return tree, expected_dirblocks
1725
def test_walk_cur_dir(self):
1726
tree, expected_dirblocks = self._get_ascii_tree()
1727
self.build_tree(tree)
1728
result = list(osutils._walkdirs_utf8('.'))
1729
# Filter out stat and abspath
1730
self.assertEqual(expected_dirblocks,
1731
[(dirinfo, [line[0:3] for line in block])
1732
for dirinfo, block in result])
1734
def test_walk_sub_dir(self):
1735
tree, expected_dirblocks = self._get_ascii_tree()
1736
self.build_tree(tree)
1737
# you can search a subdir only, with a supplied prefix.
1738
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1739
# Filter out stat and abspath
1740
self.assertEqual(expected_dirblocks[1:],
1741
[(dirinfo, [line[0:3] for line in block])
1742
for dirinfo, block in result])
1744
def _get_unicode_tree(self):
1745
name0u = u'0file-\xb6'
1746
name1u = u'1dir-\u062c\u0648'
1747
name2u = u'2file-\u0633'
1751
name1u + '/' + name0u,
1752
name1u + '/' + name1u + '/',
1755
name0 = name0u.encode('UTF-8')
1756
name1 = name1u.encode('UTF-8')
1757
name2 = name2u.encode('UTF-8')
1758
expected_dirblocks = [
1760
[(name0, name0, 'file', './' + name0u),
1761
(name1, name1, 'directory', './' + name1u),
1762
(name2, name2, 'file', './' + name2u),
1765
((name1, './' + name1u),
1766
[(name1 + '/' + name0, name0, 'file', './' + name1u
1768
(name1 + '/' + name1, name1, 'directory', './' + name1u
1772
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1777
return tree, expected_dirblocks
1779
def _filter_out(self, raw_dirblocks):
1780
"""Filter out a walkdirs_utf8 result.
1782
stat field is removed, all native paths are converted to unicode
1784
filtered_dirblocks = []
1785
for dirinfo, block in raw_dirblocks:
1786
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1789
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1790
filtered_dirblocks.append((dirinfo, details))
1791
return filtered_dirblocks
1793
def test_walk_unicode_tree(self):
1794
self.requireFeature(tests.UnicodeFilenameFeature)
1795
tree, expected_dirblocks = self._get_unicode_tree()
1796
self.build_tree(tree)
1797
result = list(osutils._walkdirs_utf8('.'))
1798
self.assertEqual(expected_dirblocks, self._filter_out(result))
1800
def test_symlink(self):
1801
self.requireFeature(tests.SymlinkFeature)
1802
self.requireFeature(tests.UnicodeFilenameFeature)
1803
target = u'target\N{Euro Sign}'
1804
link_name = u'l\N{Euro Sign}nk'
1805
os.symlink(target, link_name)
1806
target_utf8 = target.encode('UTF-8')
1807
link_name_utf8 = link_name.encode('UTF-8')
1808
expected_dirblocks = [
1810
[(link_name_utf8, link_name_utf8,
1811
'symlink', './' + link_name),],
1813
result = list(osutils._walkdirs_utf8('.'))
1814
self.assertEqual(expected_dirblocks, self._filter_out(result))
1817
class TestReadLink(tests.TestCaseInTempDir):
1818
"""Exposes os.readlink() problems and the osutils solution.
1820
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1821
unicode string will be returned if a unicode string is passed.
1823
But prior python versions failed to properly encode the passed unicode
1826
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1829
super(tests.TestCaseInTempDir, self).setUp()
1830
self.link = u'l\N{Euro Sign}ink'
1831
self.target = u'targe\N{Euro Sign}t'
1832
os.symlink(self.target, self.link)
1834
def test_os_readlink_link_encoding(self):
1835
if sys.version_info < (2, 6):
1836
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1838
self.assertEquals(self.target, os.readlink(self.link))
1840
def test_os_readlink_link_decoding(self):
1841
self.assertEquals(self.target.encode(osutils._fs_enc),
1842
os.readlink(self.link.encode(osutils._fs_enc)))
1845
class TestConcurrency(tests.TestCase):
1848
super(TestConcurrency, self).setUp()
1849
self.overrideAttr(osutils, '_cached_local_concurrency')
1851
def test_local_concurrency(self):
1852
concurrency = osutils.local_concurrency()
1853
self.assertIsInstance(concurrency, int)
1855
def test_local_concurrency_environment_variable(self):
1856
os.environ['BZR_CONCURRENCY'] = '2'
1857
self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1858
os.environ['BZR_CONCURRENCY'] = '3'
1859
self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1860
os.environ['BZR_CONCURRENCY'] = 'foo'
1861
self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1863
def test_option_concurrency(self):
1864
os.environ['BZR_CONCURRENCY'] = '1'
1865
self.run_bzr('rocks --concurrency 42')
1866
# Command line overrides envrionment variable
1867
self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1868
self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1871
class TestFailedToLoadExtension(tests.TestCase):
1873
def _try_loading(self):
1875
import bzrlib._fictional_extension_py
1876
except ImportError, e:
1877
osutils.failed_to_load_extension(e)
1881
super(TestFailedToLoadExtension, self).setUp()
1882
self.overrideAttr(osutils, '_extension_load_failures', [])
1884
def test_failure_to_load(self):
1886
self.assertLength(1, osutils._extension_load_failures)
1887
self.assertEquals(osutils._extension_load_failures[0],
1888
"No module named _fictional_extension_py")
1890
def test_report_extension_load_failures_no_warning(self):
1891
self.assertTrue(self._try_loading())
1892
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1893
# it used to give a Python warning; it no longer does
1894
self.assertLength(0, warnings)
1896
def test_report_extension_load_failures_message(self):
1898
trace.push_log_file(log)
1899
self.assertTrue(self._try_loading())
1900
osutils.report_extension_load_failures()
1901
self.assertContainsRe(
1903
r"bzr: warning: some compiled extensions could not be loaded; "
1904
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1908
class TestTerminalWidth(tests.TestCase):
1910
def replace_stdout(self, new):
1911
self.overrideAttr(sys, 'stdout', new)
1913
def replace__terminal_size(self, new):
1914
self.overrideAttr(osutils, '_terminal_size', new)
1916
def set_fake_tty(self):
1918
class I_am_a_tty(object):
1922
self.replace_stdout(I_am_a_tty())
1924
def test_default_values(self):
1925
self.assertEqual(80, osutils.default_terminal_width)
1927
def test_defaults_to_BZR_COLUMNS(self):
1928
# BZR_COLUMNS is set by the test framework
1929
self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1930
os.environ['BZR_COLUMNS'] = '12'
1931
self.assertEqual(12, osutils.terminal_width())
1933
def test_falls_back_to_COLUMNS(self):
1934
del os.environ['BZR_COLUMNS']
1935
self.assertNotEqual('42', os.environ['COLUMNS'])
1937
os.environ['COLUMNS'] = '42'
1938
self.assertEqual(42, osutils.terminal_width())
1940
def test_tty_default_without_columns(self):
1941
del os.environ['BZR_COLUMNS']
1942
del os.environ['COLUMNS']
1944
def terminal_size(w, h):
1948
# We need to override the osutils definition as it depends on the
1949
# running environment that we can't control (PQM running without a
1950
# controlling terminal is one example).
1951
self.replace__terminal_size(terminal_size)
1952
self.assertEqual(42, osutils.terminal_width())
1954
def test_non_tty_default_without_columns(self):
1955
del os.environ['BZR_COLUMNS']
1956
del os.environ['COLUMNS']
1957
self.replace_stdout(None)
1958
self.assertEqual(None, osutils.terminal_width())
1960
def test_no_TIOCGWINSZ(self):
1961
self.requireFeature(term_ios_feature)
1962
termios = term_ios_feature.module
1963
# bug 63539 is about a termios without TIOCGWINSZ attribute
1965
orig = termios.TIOCGWINSZ
1966
except AttributeError:
1967
# We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
1970
self.overrideAttr(termios, 'TIOCGWINSZ')
1971
del termios.TIOCGWINSZ
1972
del os.environ['BZR_COLUMNS']
1973
del os.environ['COLUMNS']
1974
# Whatever the result is, if we don't raise an exception, it's ok.
1975
osutils.terminal_width()