1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
35
from bzrlib.tests import (
41
class _UTF8DirReaderFeature(tests.Feature):
45
from bzrlib import _readdir_pyx
46
self.reader = _readdir_pyx.UTF8DirReader
51
def feature_name(self):
52
return 'bzrlib._readdir_pyx'
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
term_ios_feature = tests.ModuleAvailableFeature('termios')
59
def _already_unicode(s):
63
def _utf8_to_unicode(s):
64
return s.decode('UTF-8')
67
def dir_reader_scenarios():
68
# For each dir reader we define:
70
# - native_to_unicode: a function converting the native_abspath as returned
71
# by DirReader.read_dir to its unicode representation
73
# UnicodeDirReader is the fallback, it should be tested on all platforms.
74
scenarios = [('unicode',
75
dict(_dir_reader_class=osutils.UnicodeDirReader,
76
_native_to_unicode=_already_unicode))]
77
# Some DirReaders are platform specific and even there they may not be
79
if UTF8DirReaderFeature.available():
80
from bzrlib import _readdir_pyx
81
scenarios.append(('utf8',
82
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
83
_native_to_unicode=_utf8_to_unicode)))
85
if test__walkdirs_win32.win32_readdir_feature.available():
87
from bzrlib import _walkdirs_win32
90
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
91
_native_to_unicode=_already_unicode)))
97
def load_tests(basic_tests, module, loader):
98
suite = loader.suiteClass()
99
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
100
basic_tests, tests.condition_isinstance(TestDirReader))
101
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
102
suite.addTest(remaining_tests)
106
class TestContainsWhitespace(tests.TestCase):
108
def test_contains_whitespace(self):
109
self.failUnless(osutils.contains_whitespace(u' '))
110
self.failUnless(osutils.contains_whitespace(u'hello there'))
111
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
112
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
113
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
114
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
116
# \xa0 is "Non-breaking-space" which on some python locales thinks it
117
# is whitespace, but we do not.
118
self.failIf(osutils.contains_whitespace(u''))
119
self.failIf(osutils.contains_whitespace(u'hellothere'))
120
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
123
class TestRename(tests.TestCaseInTempDir):
125
def create_file(self, filename, content):
126
f = open(filename, 'wb')
132
def _fancy_rename(self, a, b):
133
osutils.fancy_rename(a, b, rename_func=os.rename,
134
unlink_func=os.unlink)
136
def test_fancy_rename(self):
137
# This should work everywhere
138
self.create_file('a', 'something in a\n')
139
self._fancy_rename('a', 'b')
140
self.failIfExists('a')
141
self.failUnlessExists('b')
142
self.check_file_contents('b', 'something in a\n')
144
self.create_file('a', 'new something in a\n')
145
self._fancy_rename('b', 'a')
147
self.check_file_contents('a', 'something in a\n')
149
def test_fancy_rename_fails_source_missing(self):
150
# An exception should be raised, and the target should be left in place
151
self.create_file('target', 'data in target\n')
152
self.assertRaises((IOError, OSError), self._fancy_rename,
153
'missingsource', 'target')
154
self.failUnlessExists('target')
155
self.check_file_contents('target', 'data in target\n')
157
def test_fancy_rename_fails_if_source_and_target_missing(self):
158
self.assertRaises((IOError, OSError), self._fancy_rename,
159
'missingsource', 'missingtarget')
161
def test_rename(self):
162
# Rename should be semi-atomic on all platforms
163
self.create_file('a', 'something in a\n')
164
osutils.rename('a', 'b')
165
self.failIfExists('a')
166
self.failUnlessExists('b')
167
self.check_file_contents('b', 'something in a\n')
169
self.create_file('a', 'new something in a\n')
170
osutils.rename('b', 'a')
172
self.check_file_contents('a', 'something in a\n')
174
# TODO: test fancy_rename using a MemoryTransport
176
def test_rename_change_case(self):
177
# on Windows we should be able to change filename case by rename
178
self.build_tree(['a', 'b/'])
179
osutils.rename('a', 'A')
180
osutils.rename('b', 'B')
181
# we can't use failUnlessExists on case-insensitive filesystem
182
# so try to check shape of the tree
183
shape = sorted(os.listdir('.'))
184
self.assertEquals(['A', 'B'], shape)
187
class TestRandChars(tests.TestCase):
189
def test_01_rand_chars_empty(self):
190
result = osutils.rand_chars(0)
191
self.assertEqual(result, '')
193
def test_02_rand_chars_100(self):
194
result = osutils.rand_chars(100)
195
self.assertEqual(len(result), 100)
196
self.assertEqual(type(result), str)
197
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
200
class TestIsInside(tests.TestCase):
202
def test_is_inside(self):
203
is_inside = osutils.is_inside
204
self.assertTrue(is_inside('src', 'src/foo.c'))
205
self.assertFalse(is_inside('src', 'srccontrol'))
206
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
207
self.assertTrue(is_inside('foo.c', 'foo.c'))
208
self.assertFalse(is_inside('foo.c', ''))
209
self.assertTrue(is_inside('', 'foo.c'))
211
def test_is_inside_any(self):
212
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
213
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
214
(['src'], SRC_FOO_C),
217
self.assert_(osutils.is_inside_any(dirs, fn))
218
for dirs, fn in [(['src'], 'srccontrol'),
219
(['src'], 'srccontrol/foo')]:
220
self.assertFalse(osutils.is_inside_any(dirs, fn))
222
def test_is_inside_or_parent_of_any(self):
223
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
224
(['src'], 'src/foo.c'),
225
(['src/bar.c'], 'src'),
226
(['src/bar.c', 'bla/foo.c'], 'src'),
229
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
231
for dirs, fn in [(['src'], 'srccontrol'),
232
(['srccontrol/foo.c'], 'src'),
233
(['src'], 'srccontrol/foo')]:
234
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
237
class TestRmTree(tests.TestCaseInTempDir):
239
def test_rmtree(self):
240
# Check to remove tree with read-only files/dirs
242
f = file('dir/file', 'w')
245
# would like to also try making the directory readonly, but at the
246
# moment python shutil.rmtree doesn't handle that properly - it would
247
# need to chmod the directory before removing things inside it - deferred
248
# for now -- mbp 20060505
249
# osutils.make_readonly('dir')
250
osutils.make_readonly('dir/file')
252
osutils.rmtree('dir')
254
self.failIfExists('dir/file')
255
self.failIfExists('dir')
258
class TestDeleteAny(tests.TestCaseInTempDir):
260
def test_delete_any_readonly(self):
261
# from <https://bugs.launchpad.net/bzr/+bug/218206>
262
self.build_tree(['d/', 'f'])
263
osutils.make_readonly('d')
264
osutils.make_readonly('f')
266
osutils.delete_any('f')
267
osutils.delete_any('d')
270
class TestKind(tests.TestCaseInTempDir):
272
def test_file_kind(self):
273
self.build_tree(['file', 'dir/'])
274
self.assertEquals('file', osutils.file_kind('file'))
275
self.assertEquals('directory', osutils.file_kind('dir/'))
276
if osutils.has_symlinks():
277
os.symlink('symlink', 'symlink')
278
self.assertEquals('symlink', osutils.file_kind('symlink'))
280
# TODO: jam 20060529 Test a block device
282
os.lstat('/dev/null')
284
if e.errno not in (errno.ENOENT,):
287
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
289
mkfifo = getattr(os, 'mkfifo', None)
293
self.assertEquals('fifo', osutils.file_kind('fifo'))
297
AF_UNIX = getattr(socket, 'AF_UNIX', None)
299
s = socket.socket(AF_UNIX)
302
self.assertEquals('socket', osutils.file_kind('socket'))
306
def test_kind_marker(self):
307
self.assertEqual("", osutils.kind_marker("file"))
308
self.assertEqual("/", osutils.kind_marker('directory'))
309
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
310
self.assertEqual("@", osutils.kind_marker("symlink"))
311
self.assertEqual("+", osutils.kind_marker("tree-reference"))
312
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
315
class TestUmask(tests.TestCaseInTempDir):
317
def test_get_umask(self):
318
if sys.platform == 'win32':
319
# umask always returns '0', no way to set it
320
self.assertEqual(0, osutils.get_umask())
323
orig_umask = osutils.get_umask()
324
self.addCleanup(os.umask, orig_umask)
326
self.assertEqual(0222, osutils.get_umask())
328
self.assertEqual(0022, osutils.get_umask())
330
self.assertEqual(0002, osutils.get_umask())
332
self.assertEqual(0027, osutils.get_umask())
335
class TestDateTime(tests.TestCase):
337
def assertFormatedDelta(self, expected, seconds):
338
"""Assert osutils.format_delta formats as expected"""
339
actual = osutils.format_delta(seconds)
340
self.assertEqual(expected, actual)
342
def test_format_delta(self):
343
self.assertFormatedDelta('0 seconds ago', 0)
344
self.assertFormatedDelta('1 second ago', 1)
345
self.assertFormatedDelta('10 seconds ago', 10)
346
self.assertFormatedDelta('59 seconds ago', 59)
347
self.assertFormatedDelta('89 seconds ago', 89)
348
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
349
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
350
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
351
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
352
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
353
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
354
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
355
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
356
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
357
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
358
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
359
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
360
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
361
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
362
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
363
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
364
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
365
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
367
# We handle when time steps the wrong direction because computers
368
# don't have synchronized clocks.
369
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
370
self.assertFormatedDelta('1 second in the future', -1)
371
self.assertFormatedDelta('2 seconds in the future', -2)
373
def test_format_date(self):
374
self.assertRaises(errors.UnsupportedTimezoneFormat,
375
osutils.format_date, 0, timezone='foo')
376
self.assertIsInstance(osutils.format_date(0), str)
377
self.assertIsInstance(osutils.format_local_date(0), unicode)
378
# Testing for the actual value of the local weekday without
379
# duplicating the code from format_date is difficult.
380
# Instead blackbox.test_locale should check for localized
381
# dates once they do occur in output strings.
383
def test_format_date_with_offset_in_original_timezone(self):
384
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
385
osutils.format_date_with_offset_in_original_timezone(0))
386
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
387
osutils.format_date_with_offset_in_original_timezone(100000))
388
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
389
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
391
def test_local_time_offset(self):
392
"""Test that local_time_offset() returns a sane value."""
393
offset = osutils.local_time_offset()
394
self.assertTrue(isinstance(offset, int))
395
# Test that the offset is no more than a eighteen hours in
397
# Time zone handling is system specific, so it is difficult to
398
# do more specific tests, but a value outside of this range is
400
eighteen_hours = 18 * 3600
401
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
403
def test_local_time_offset_with_timestamp(self):
404
"""Test that local_time_offset() works with a timestamp."""
405
offset = osutils.local_time_offset(1000000000.1234567)
406
self.assertTrue(isinstance(offset, int))
407
eighteen_hours = 18 * 3600
408
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
411
class TestLinks(tests.TestCaseInTempDir):
413
def test_dereference_path(self):
414
self.requireFeature(tests.SymlinkFeature)
415
cwd = osutils.realpath('.')
417
bar_path = osutils.pathjoin(cwd, 'bar')
418
# Using './' to avoid bug #1213894 (first path component not
419
# dereferenced) in Python 2.4.1 and earlier
420
self.assertEqual(bar_path, osutils.realpath('./bar'))
421
os.symlink('bar', 'foo')
422
self.assertEqual(bar_path, osutils.realpath('./foo'))
424
# Does not dereference terminal symlinks
425
foo_path = osutils.pathjoin(cwd, 'foo')
426
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
428
# Dereferences parent symlinks
430
baz_path = osutils.pathjoin(bar_path, 'baz')
431
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
433
# Dereferences parent symlinks that are the first path element
434
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
436
# Dereferences parent symlinks in absolute paths
437
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
438
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
440
def test_changing_access(self):
441
f = file('file', 'w')
445
# Make a file readonly
446
osutils.make_readonly('file')
447
mode = os.lstat('file').st_mode
448
self.assertEqual(mode, mode & 0777555)
450
# Make a file writable
451
osutils.make_writable('file')
452
mode = os.lstat('file').st_mode
453
self.assertEqual(mode, mode | 0200)
455
if osutils.has_symlinks():
456
# should not error when handed a symlink
457
os.symlink('nonexistent', 'dangling')
458
osutils.make_readonly('dangling')
459
osutils.make_writable('dangling')
461
def test_host_os_dereferences_symlinks(self):
462
osutils.host_os_dereferences_symlinks()
465
class TestCanonicalRelPath(tests.TestCaseInTempDir):
467
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
469
def test_canonical_relpath_simple(self):
470
f = file('MixedCaseName', 'w')
472
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
473
self.failUnlessEqual('work/MixedCaseName', actual)
475
def test_canonical_relpath_missing_tail(self):
476
os.mkdir('MixedCaseParent')
477
actual = osutils.canonical_relpath(self.test_base_dir,
478
'mixedcaseparent/nochild')
479
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
482
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
484
def assertRelpath(self, expected, base, path):
485
actual = osutils._cicp_canonical_relpath(base, path)
486
self.assertEqual(expected, actual)
488
def test_simple(self):
489
self.build_tree(['MixedCaseName'])
490
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
491
self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
493
def test_subdir_missing_tail(self):
494
self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
495
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
496
self.assertRelpath('MixedCaseParent/a_child', base,
497
'MixedCaseParent/a_child')
498
self.assertRelpath('MixedCaseParent/a_child', base,
499
'MixedCaseParent/A_Child')
500
self.assertRelpath('MixedCaseParent/not_child', base,
501
'MixedCaseParent/not_child')
503
def test_at_root_slash(self):
504
# We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
506
if osutils.MIN_ABS_PATHLENGTH > 1:
507
raise tests.TestSkipped('relpath requires %d chars'
508
% osutils.MIN_ABS_PATHLENGTH)
509
self.assertRelpath('foo', '/', '/foo')
511
def test_at_root_drive(self):
512
if sys.platform != 'win32':
513
raise tests.TestNotApplicable('we can only test drive-letter relative'
514
' paths on Windows where we have drive'
517
# The specific issue is that when at the root of a drive, 'abspath'
518
# returns "C:/" or just "/". However, the code assumes that abspath
519
# always returns something like "C:/foo" or "/foo" (no trailing slash).
520
self.assertRelpath('foo', 'C:/', 'C:/foo')
521
self.assertRelpath('foo', 'X:/', 'X:/foo')
522
self.assertRelpath('foo', 'X:/', 'X://foo')
525
class TestPumpFile(tests.TestCase):
526
"""Test pumpfile method."""
529
tests.TestCase.setUp(self)
530
# create a test datablock
531
self.block_size = 512
532
pattern = '0123456789ABCDEF'
533
self.test_data = pattern * (3 * self.block_size / len(pattern))
534
self.test_data_len = len(self.test_data)
536
def test_bracket_block_size(self):
537
"""Read data in blocks with the requested read size bracketing the
539
# make sure test data is larger than max read size
540
self.assertTrue(self.test_data_len > self.block_size)
542
from_file = file_utils.FakeReadFile(self.test_data)
545
# read (max / 2) bytes and verify read size wasn't affected
546
num_bytes_to_read = self.block_size / 2
547
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
548
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
549
self.assertEqual(from_file.get_read_count(), 1)
551
# read (max) bytes and verify read size wasn't affected
552
num_bytes_to_read = self.block_size
553
from_file.reset_read_count()
554
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
555
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
556
self.assertEqual(from_file.get_read_count(), 1)
558
# read (max + 1) bytes and verify read size was limited
559
num_bytes_to_read = self.block_size + 1
560
from_file.reset_read_count()
561
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
562
self.assertEqual(from_file.get_max_read_size(), self.block_size)
563
self.assertEqual(from_file.get_read_count(), 2)
565
# finish reading the rest of the data
566
num_bytes_to_read = self.test_data_len - to_file.tell()
567
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
569
# report error if the data wasn't equal (we only report the size due
570
# to the length of the data)
571
response_data = to_file.getvalue()
572
if response_data != self.test_data:
573
message = "Data not equal. Expected %d bytes, received %d."
574
self.fail(message % (len(response_data), self.test_data_len))
576
def test_specified_size(self):
577
"""Request a transfer larger than the maximum block size and verify
578
that the maximum read doesn't exceed the block_size."""
579
# make sure test data is larger than max read size
580
self.assertTrue(self.test_data_len > self.block_size)
582
# retrieve data in blocks
583
from_file = file_utils.FakeReadFile(self.test_data)
585
osutils.pumpfile(from_file, to_file, self.test_data_len,
588
# verify read size was equal to the maximum read size
589
self.assertTrue(from_file.get_max_read_size() > 0)
590
self.assertEqual(from_file.get_max_read_size(), self.block_size)
591
self.assertEqual(from_file.get_read_count(), 3)
593
# report error if the data wasn't equal (we only report the size due
594
# to the length of the data)
595
response_data = to_file.getvalue()
596
if response_data != self.test_data:
597
message = "Data not equal. Expected %d bytes, received %d."
598
self.fail(message % (len(response_data), self.test_data_len))
600
def test_to_eof(self):
601
"""Read to end-of-file and verify that the reads are not larger than
602
the maximum read size."""
603
# make sure test data is larger than max read size
604
self.assertTrue(self.test_data_len > self.block_size)
606
# retrieve data to EOF
607
from_file = file_utils.FakeReadFile(self.test_data)
609
osutils.pumpfile(from_file, to_file, -1, self.block_size)
611
# verify read size was equal to the maximum read size
612
self.assertEqual(from_file.get_max_read_size(), self.block_size)
613
self.assertEqual(from_file.get_read_count(), 4)
615
# report error if the data wasn't equal (we only report the size due
616
# to the length of the data)
617
response_data = to_file.getvalue()
618
if response_data != self.test_data:
619
message = "Data not equal. Expected %d bytes, received %d."
620
self.fail(message % (len(response_data), self.test_data_len))
622
def test_defaults(self):
623
"""Verifies that the default arguments will read to EOF -- this
624
test verifies that any existing usages of pumpfile will not be broken
625
with this new version."""
626
# retrieve data using default (old) pumpfile method
627
from_file = file_utils.FakeReadFile(self.test_data)
629
osutils.pumpfile(from_file, to_file)
631
# report error if the data wasn't equal (we only report the size due
632
# to the length of the data)
633
response_data = to_file.getvalue()
634
if response_data != self.test_data:
635
message = "Data not equal. Expected %d bytes, received %d."
636
self.fail(message % (len(response_data), self.test_data_len))
638
def test_report_activity(self):
640
def log_activity(length, direction):
641
activity.append((length, direction))
642
from_file = StringIO(self.test_data)
644
osutils.pumpfile(from_file, to_file, buff_size=500,
645
report_activity=log_activity, direction='read')
646
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
647
(36, 'read')], activity)
649
from_file = StringIO(self.test_data)
652
osutils.pumpfile(from_file, to_file, buff_size=500,
653
report_activity=log_activity, direction='write')
654
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
655
(36, 'write')], activity)
657
# And with a limited amount of data
658
from_file = StringIO(self.test_data)
661
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
662
report_activity=log_activity, direction='read')
663
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
667
class TestPumpStringFile(tests.TestCase):
669
def test_empty(self):
671
osutils.pump_string_file("", output)
672
self.assertEqual("", output.getvalue())
674
def test_more_than_segment_size(self):
676
osutils.pump_string_file("123456789", output, 2)
677
self.assertEqual("123456789", output.getvalue())
679
def test_segment_size(self):
681
osutils.pump_string_file("12", output, 2)
682
self.assertEqual("12", output.getvalue())
684
def test_segment_size_multiple(self):
686
osutils.pump_string_file("1234", output, 2)
687
self.assertEqual("1234", output.getvalue())
690
class TestRelpath(tests.TestCase):
692
def test_simple_relpath(self):
693
cwd = osutils.getcwd()
694
subdir = cwd + '/subdir'
695
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
697
def test_deep_relpath(self):
698
cwd = osutils.getcwd()
699
subdir = cwd + '/sub/subsubdir'
700
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
702
def test_not_relative(self):
703
self.assertRaises(errors.PathNotChild,
704
osutils.relpath, 'C:/path', 'H:/path')
705
self.assertRaises(errors.PathNotChild,
706
osutils.relpath, 'C:/', 'H:/path')
709
class TestSafeUnicode(tests.TestCase):
711
def test_from_ascii_string(self):
712
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
714
def test_from_unicode_string_ascii_contents(self):
715
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
717
def test_from_unicode_string_unicode_contents(self):
718
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
720
def test_from_utf8_string(self):
721
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
723
def test_bad_utf8_string(self):
724
self.assertRaises(errors.BzrBadParameterNotUnicode,
725
osutils.safe_unicode,
729
class TestSafeUtf8(tests.TestCase):
731
def test_from_ascii_string(self):
733
self.assertEqual('foobar', osutils.safe_utf8(f))
735
def test_from_unicode_string_ascii_contents(self):
736
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
738
def test_from_unicode_string_unicode_contents(self):
739
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
741
def test_from_utf8_string(self):
742
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
744
def test_bad_utf8_string(self):
745
self.assertRaises(errors.BzrBadParameterNotUnicode,
746
osutils.safe_utf8, '\xbb\xbb')
749
class TestSafeRevisionId(tests.TestCase):
751
def test_from_ascii_string(self):
752
# this shouldn't give a warning because it's getting an ascii string
753
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
755
def test_from_unicode_string_ascii_contents(self):
756
self.assertEqual('bargam',
757
osutils.safe_revision_id(u'bargam', warn=False))
759
def test_from_unicode_deprecated(self):
760
self.assertEqual('bargam',
761
self.callDeprecated([osutils._revision_id_warning],
762
osutils.safe_revision_id, u'bargam'))
764
def test_from_unicode_string_unicode_contents(self):
765
self.assertEqual('bargam\xc2\xae',
766
osutils.safe_revision_id(u'bargam\xae', warn=False))
768
def test_from_utf8_string(self):
769
self.assertEqual('foo\xc2\xae',
770
osutils.safe_revision_id('foo\xc2\xae'))
773
"""Currently, None is a valid revision_id"""
774
self.assertEqual(None, osutils.safe_revision_id(None))
777
class TestSafeFileId(tests.TestCase):
779
def test_from_ascii_string(self):
780
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
782
def test_from_unicode_string_ascii_contents(self):
783
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
785
def test_from_unicode_deprecated(self):
786
self.assertEqual('bargam',
787
self.callDeprecated([osutils._file_id_warning],
788
osutils.safe_file_id, u'bargam'))
790
def test_from_unicode_string_unicode_contents(self):
791
self.assertEqual('bargam\xc2\xae',
792
osutils.safe_file_id(u'bargam\xae', warn=False))
794
def test_from_utf8_string(self):
795
self.assertEqual('foo\xc2\xae',
796
osutils.safe_file_id('foo\xc2\xae'))
799
"""Currently, None is a valid revision_id"""
800
self.assertEqual(None, osutils.safe_file_id(None))
803
class TestWin32Funcs(tests.TestCase):
804
"""Test that _win32 versions of os utilities return appropriate paths."""
806
def test_abspath(self):
807
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
808
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
809
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
810
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
812
def test_realpath(self):
813
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
814
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
816
def test_pathjoin(self):
817
self.assertEqual('path/to/foo',
818
osutils._win32_pathjoin('path', 'to', 'foo'))
819
self.assertEqual('C:/foo',
820
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
821
self.assertEqual('C:/foo',
822
osutils._win32_pathjoin('path/to', 'C:/foo'))
823
self.assertEqual('path/to/foo',
824
osutils._win32_pathjoin('path/to/', 'foo'))
825
self.assertEqual('/foo',
826
osutils._win32_pathjoin('C:/path/to/', '/foo'))
827
self.assertEqual('/foo',
828
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
830
def test_normpath(self):
831
self.assertEqual('path/to/foo',
832
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
833
self.assertEqual('path/to/foo',
834
osutils._win32_normpath('path//from/../to/./foo'))
836
def test_getcwd(self):
837
cwd = osutils._win32_getcwd()
838
os_cwd = os.getcwdu()
839
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
840
# win32 is inconsistent whether it returns lower or upper case
841
# and even if it was consistent the user might type the other
842
# so we force it to uppercase
843
# running python.exe under cmd.exe return capital C:\\
844
# running win32 python inside a cygwin shell returns lowercase
845
self.assertEqual(os_cwd[0].upper(), cwd[0])
847
def test_fixdrive(self):
848
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
849
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
850
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
852
def test_win98_abspath(self):
854
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
855
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
857
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
858
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
860
cwd = osutils.getcwd().rstrip('/')
861
drive = osutils._nt_splitdrive(cwd)[0]
862
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
863
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
866
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
869
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
870
"""Test win32 functions that create files."""
872
def test_getcwd(self):
873
self.requireFeature(tests.UnicodeFilenameFeature)
876
# TODO: jam 20060427 This will probably fail on Mac OSX because
877
# it will change the normalization of B\xe5gfors
878
# Consider using a different unicode character, or make
879
# osutils.getcwd() renormalize the path.
880
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
882
def test_minimum_path_selection(self):
883
self.assertEqual(set(),
884
osutils.minimum_path_selection([]))
885
self.assertEqual(set(['a']),
886
osutils.minimum_path_selection(['a']))
887
self.assertEqual(set(['a', 'b']),
888
osutils.minimum_path_selection(['a', 'b']))
889
self.assertEqual(set(['a/', 'b']),
890
osutils.minimum_path_selection(['a/', 'b']))
891
self.assertEqual(set(['a/', 'b']),
892
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
893
self.assertEqual(set(['a-b', 'a', 'a0b']),
894
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
896
def test_mkdtemp(self):
897
tmpdir = osutils._win32_mkdtemp(dir='.')
898
self.assertFalse('\\' in tmpdir)
900
def test_rename(self):
908
osutils._win32_rename('b', 'a')
909
self.failUnlessExists('a')
910
self.failIfExists('b')
911
self.assertFileEqual('baz\n', 'a')
913
def test_rename_missing_file(self):
919
osutils._win32_rename('b', 'a')
920
except (IOError, OSError), e:
921
self.assertEqual(errno.ENOENT, e.errno)
922
self.assertFileEqual('foo\n', 'a')
924
def test_rename_missing_dir(self):
927
osutils._win32_rename('b', 'a')
928
except (IOError, OSError), e:
929
self.assertEqual(errno.ENOENT, e.errno)
931
def test_rename_current_dir(self):
934
# You can't rename the working directory
935
# doing rename non-existant . usually
936
# just raises ENOENT, since non-existant
939
osutils._win32_rename('b', '.')
940
except (IOError, OSError), e:
941
self.assertEqual(errno.ENOENT, e.errno)
943
def test_splitpath(self):
944
def check(expected, path):
945
self.assertEqual(expected, osutils.splitpath(path))
948
check(['a', 'b'], 'a/b')
949
check(['a', 'b'], 'a/./b')
950
check(['a', '.b'], 'a/.b')
951
check(['a', '.b'], 'a\\.b')
953
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
956
class TestParentDirectories(tests.TestCaseInTempDir):
957
"""Test osutils.parent_directories()"""
959
def test_parent_directories(self):
960
self.assertEqual([], osutils.parent_directories('a'))
961
self.assertEqual(['a'], osutils.parent_directories('a/b'))
962
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
965
class TestMacFuncsDirs(tests.TestCaseInTempDir):
966
"""Test mac special functions that require directories."""
968
def test_getcwd(self):
969
self.requireFeature(tests.UnicodeFilenameFeature)
970
os.mkdir(u'B\xe5gfors')
971
os.chdir(u'B\xe5gfors')
972
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
974
def test_getcwd_nonnorm(self):
975
self.requireFeature(tests.UnicodeFilenameFeature)
976
# Test that _mac_getcwd() will normalize this path
977
os.mkdir(u'Ba\u030agfors')
978
os.chdir(u'Ba\u030agfors')
979
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
982
class TestChunksToLines(tests.TestCase):
984
def test_smoketest(self):
985
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
986
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
987
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
988
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
990
def test_osutils_binding(self):
991
from bzrlib.tests import test__chunks_to_lines
992
if test__chunks_to_lines.compiled_chunkstolines_feature.available():
993
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
995
from bzrlib._chunks_to_lines_py import chunks_to_lines
996
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
999
class TestSplitLines(tests.TestCase):
1001
def test_split_unicode(self):
1002
self.assertEqual([u'foo\n', u'bar\xae'],
1003
osutils.split_lines(u'foo\nbar\xae'))
1004
self.assertEqual([u'foo\n', u'bar\xae\n'],
1005
osutils.split_lines(u'foo\nbar\xae\n'))
1007
def test_split_with_carriage_returns(self):
1008
self.assertEqual(['foo\rbar\n'],
1009
osutils.split_lines('foo\rbar\n'))
1012
class TestWalkDirs(tests.TestCaseInTempDir):
1014
def assertExpectedBlocks(self, expected, result):
1015
self.assertEqual(expected,
1016
[(dirinfo, [line[0:3] for line in block])
1017
for dirinfo, block in result])
1019
def test_walkdirs(self):
1028
self.build_tree(tree)
1029
expected_dirblocks = [
1031
[('0file', '0file', 'file'),
1032
('1dir', '1dir', 'directory'),
1033
('2file', '2file', 'file'),
1036
(('1dir', './1dir'),
1037
[('1dir/0file', '0file', 'file'),
1038
('1dir/1dir', '1dir', 'directory'),
1041
(('1dir/1dir', './1dir/1dir'),
1047
found_bzrdir = False
1048
for dirdetail, dirblock in osutils.walkdirs('.'):
1049
if len(dirblock) and dirblock[0][1] == '.bzr':
1050
# this tests the filtering of selected paths
1053
result.append((dirdetail, dirblock))
1055
self.assertTrue(found_bzrdir)
1056
self.assertExpectedBlocks(expected_dirblocks, result)
1057
# you can search a subdir only, with a supplied prefix.
1059
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1060
result.append(dirblock)
1061
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1063
def test_walkdirs_os_error(self):
1064
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1065
# Pyrex readdir didn't raise useful messages if it had an error
1066
# reading the directory
1067
if sys.platform == 'win32':
1068
raise tests.TestNotApplicable(
1069
"readdir IOError not tested on win32")
1070
os.mkdir("test-unreadable")
1071
os.chmod("test-unreadable", 0000)
1072
# must chmod it back so that it can be removed
1073
self.addCleanup(os.chmod, "test-unreadable", 0700)
1074
# The error is not raised until the generator is actually evaluated.
1075
# (It would be ok if it happened earlier but at the moment it
1077
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1078
self.assertEquals('./test-unreadable', e.filename)
1079
self.assertEquals(errno.EACCES, e.errno)
1080
# Ensure the message contains the file name
1081
self.assertContainsRe(str(e), "\./test-unreadable")
1083
def test__walkdirs_utf8(self):
1092
self.build_tree(tree)
1093
expected_dirblocks = [
1095
[('0file', '0file', 'file'),
1096
('1dir', '1dir', 'directory'),
1097
('2file', '2file', 'file'),
1100
(('1dir', './1dir'),
1101
[('1dir/0file', '0file', 'file'),
1102
('1dir/1dir', '1dir', 'directory'),
1105
(('1dir/1dir', './1dir/1dir'),
1111
found_bzrdir = False
1112
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1113
if len(dirblock) and dirblock[0][1] == '.bzr':
1114
# this tests the filtering of selected paths
1117
result.append((dirdetail, dirblock))
1119
self.assertTrue(found_bzrdir)
1120
self.assertExpectedBlocks(expected_dirblocks, result)
1122
# you can search a subdir only, with a supplied prefix.
1124
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1125
result.append(dirblock)
1126
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1128
def _filter_out_stat(self, result):
1129
"""Filter out the stat value from the walkdirs result"""
1130
for dirdetail, dirblock in result:
1132
for info in dirblock:
1133
# Ignore info[3] which is the stat
1134
new_dirblock.append((info[0], info[1], info[2], info[4]))
1135
dirblock[:] = new_dirblock
1137
def _save_platform_info(self):
1138
cur_winver = win32utils.winver
1139
cur_fs_enc = osutils._fs_enc
1140
cur_dir_reader = osutils._selected_dir_reader
1142
win32utils.winver = cur_winver
1143
osutils._fs_enc = cur_fs_enc
1144
osutils._selected_dir_reader = cur_dir_reader
1145
self.addCleanup(restore)
1147
def assertDirReaderIs(self, expected):
1148
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1149
# Force it to redetect
1150
osutils._selected_dir_reader = None
1151
# Nothing to list, but should still trigger the selection logic
1152
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1153
self.assertIsInstance(osutils._selected_dir_reader, expected)
1155
def test_force_walkdirs_utf8_fs_utf8(self):
1156
self.requireFeature(UTF8DirReaderFeature)
1157
self._save_platform_info()
1158
win32utils.winver = None # Avoid the win32 detection code
1159
osutils._fs_enc = 'UTF-8'
1160
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1162
def test_force_walkdirs_utf8_fs_ascii(self):
1163
self.requireFeature(UTF8DirReaderFeature)
1164
self._save_platform_info()
1165
win32utils.winver = None # Avoid the win32 detection code
1166
osutils._fs_enc = 'US-ASCII'
1167
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1169
def test_force_walkdirs_utf8_fs_ANSI(self):
1170
self.requireFeature(UTF8DirReaderFeature)
1171
self._save_platform_info()
1172
win32utils.winver = None # Avoid the win32 detection code
1173
osutils._fs_enc = 'ANSI_X3.4-1968'
1174
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1176
def test_force_walkdirs_utf8_fs_latin1(self):
1177
self._save_platform_info()
1178
win32utils.winver = None # Avoid the win32 detection code
1179
osutils._fs_enc = 'latin1'
1180
self.assertDirReaderIs(osutils.UnicodeDirReader)
1182
def test_force_walkdirs_utf8_nt(self):
1183
# Disabled because the thunk of the whole walkdirs api is disabled.
1184
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1185
self._save_platform_info()
1186
win32utils.winver = 'Windows NT'
1187
from bzrlib._walkdirs_win32 import Win32ReadDir
1188
self.assertDirReaderIs(Win32ReadDir)
1190
def test_force_walkdirs_utf8_98(self):
1191
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1192
self._save_platform_info()
1193
win32utils.winver = 'Windows 98'
1194
self.assertDirReaderIs(osutils.UnicodeDirReader)
1196
def test_unicode_walkdirs(self):
1197
"""Walkdirs should always return unicode paths."""
1198
self.requireFeature(tests.UnicodeFilenameFeature)
1199
name0 = u'0file-\xb6'
1200
name1 = u'1dir-\u062c\u0648'
1201
name2 = u'2file-\u0633'
1205
name1 + '/' + name0,
1206
name1 + '/' + name1 + '/',
1209
self.build_tree(tree)
1210
expected_dirblocks = [
1212
[(name0, name0, 'file', './' + name0),
1213
(name1, name1, 'directory', './' + name1),
1214
(name2, name2, 'file', './' + name2),
1217
((name1, './' + name1),
1218
[(name1 + '/' + name0, name0, 'file', './' + name1
1220
(name1 + '/' + name1, name1, 'directory', './' + name1
1224
((name1 + '/' + name1, './' + name1 + '/' + name1),
1229
result = list(osutils.walkdirs('.'))
1230
self._filter_out_stat(result)
1231
self.assertEqual(expected_dirblocks, result)
1232
result = list(osutils.walkdirs(u'./'+name1, name1))
1233
self._filter_out_stat(result)
1234
self.assertEqual(expected_dirblocks[1:], result)
1236
def test_unicode__walkdirs_utf8(self):
1237
"""Walkdirs_utf8 should always return utf8 paths.
1239
The abspath portion might be in unicode or utf-8
1241
self.requireFeature(tests.UnicodeFilenameFeature)
1242
name0 = u'0file-\xb6'
1243
name1 = u'1dir-\u062c\u0648'
1244
name2 = u'2file-\u0633'
1248
name1 + '/' + name0,
1249
name1 + '/' + name1 + '/',
1252
self.build_tree(tree)
1253
name0 = name0.encode('utf8')
1254
name1 = name1.encode('utf8')
1255
name2 = name2.encode('utf8')
1257
expected_dirblocks = [
1259
[(name0, name0, 'file', './' + name0),
1260
(name1, name1, 'directory', './' + name1),
1261
(name2, name2, 'file', './' + name2),
1264
((name1, './' + name1),
1265
[(name1 + '/' + name0, name0, 'file', './' + name1
1267
(name1 + '/' + name1, name1, 'directory', './' + name1
1271
((name1 + '/' + name1, './' + name1 + '/' + name1),
1277
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1278
# all abspaths are Unicode, and encode them back into utf8.
1279
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1280
self.assertIsInstance(dirdetail[0], str)
1281
if isinstance(dirdetail[1], unicode):
1282
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1283
dirblock = [list(info) for info in dirblock]
1284
for info in dirblock:
1285
self.assertIsInstance(info[4], unicode)
1286
info[4] = info[4].encode('utf8')
1288
for info in dirblock:
1289
self.assertIsInstance(info[0], str)
1290
self.assertIsInstance(info[1], str)
1291
self.assertIsInstance(info[4], str)
1292
# Remove the stat information
1293
new_dirblock.append((info[0], info[1], info[2], info[4]))
1294
result.append((dirdetail, new_dirblock))
1295
self.assertEqual(expected_dirblocks, result)
1297
def test__walkdirs_utf8_with_unicode_fs(self):
1298
"""UnicodeDirReader should be a safe fallback everywhere
1300
The abspath portion should be in unicode
1302
self.requireFeature(tests.UnicodeFilenameFeature)
1303
# Use the unicode reader. TODO: split into driver-and-driven unit
1305
self._save_platform_info()
1306
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1307
name0u = u'0file-\xb6'
1308
name1u = u'1dir-\u062c\u0648'
1309
name2u = u'2file-\u0633'
1313
name1u + '/' + name0u,
1314
name1u + '/' + name1u + '/',
1317
self.build_tree(tree)
1318
name0 = name0u.encode('utf8')
1319
name1 = name1u.encode('utf8')
1320
name2 = name2u.encode('utf8')
1322
# All of the abspaths should be in unicode, all of the relative paths
1324
expected_dirblocks = [
1326
[(name0, name0, 'file', './' + name0u),
1327
(name1, name1, 'directory', './' + name1u),
1328
(name2, name2, 'file', './' + name2u),
1331
((name1, './' + name1u),
1332
[(name1 + '/' + name0, name0, 'file', './' + name1u
1334
(name1 + '/' + name1, name1, 'directory', './' + name1u
1338
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1343
result = list(osutils._walkdirs_utf8('.'))
1344
self._filter_out_stat(result)
1345
self.assertEqual(expected_dirblocks, result)
1347
def test__walkdirs_utf8_win32readdir(self):
1348
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1349
self.requireFeature(tests.UnicodeFilenameFeature)
1350
from bzrlib._walkdirs_win32 import Win32ReadDir
1351
self._save_platform_info()
1352
osutils._selected_dir_reader = Win32ReadDir()
1353
name0u = u'0file-\xb6'
1354
name1u = u'1dir-\u062c\u0648'
1355
name2u = u'2file-\u0633'
1359
name1u + '/' + name0u,
1360
name1u + '/' + name1u + '/',
1363
self.build_tree(tree)
1364
name0 = name0u.encode('utf8')
1365
name1 = name1u.encode('utf8')
1366
name2 = name2u.encode('utf8')
1368
# All of the abspaths should be in unicode, all of the relative paths
1370
expected_dirblocks = [
1372
[(name0, name0, 'file', './' + name0u),
1373
(name1, name1, 'directory', './' + name1u),
1374
(name2, name2, 'file', './' + name2u),
1377
((name1, './' + name1u),
1378
[(name1 + '/' + name0, name0, 'file', './' + name1u
1380
(name1 + '/' + name1, name1, 'directory', './' + name1u
1384
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1389
result = list(osutils._walkdirs_utf8(u'.'))
1390
self._filter_out_stat(result)
1391
self.assertEqual(expected_dirblocks, result)
1393
def assertStatIsCorrect(self, path, win32stat):
1394
os_stat = os.stat(path)
1395
self.assertEqual(os_stat.st_size, win32stat.st_size)
1396
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1397
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1398
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1399
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1400
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1401
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1403
def test__walkdirs_utf_win32_find_file_stat_file(self):
1404
"""make sure our Stat values are valid"""
1405
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1406
self.requireFeature(tests.UnicodeFilenameFeature)
1407
from bzrlib._walkdirs_win32 import Win32ReadDir
1408
name0u = u'0file-\xb6'
1409
name0 = name0u.encode('utf8')
1410
self.build_tree([name0u])
1411
# I hate to sleep() here, but I'm trying to make the ctime different
1414
f = open(name0u, 'ab')
1416
f.write('just a small update')
1420
result = Win32ReadDir().read_dir('', u'.')
1422
self.assertEqual((name0, name0, 'file'), entry[:3])
1423
self.assertEqual(u'./' + name0u, entry[4])
1424
self.assertStatIsCorrect(entry[4], entry[3])
1425
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1427
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1428
"""make sure our Stat values are valid"""
1429
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1430
self.requireFeature(tests.UnicodeFilenameFeature)
1431
from bzrlib._walkdirs_win32 import Win32ReadDir
1432
name0u = u'0dir-\u062c\u0648'
1433
name0 = name0u.encode('utf8')
1434
self.build_tree([name0u + '/'])
1436
result = Win32ReadDir().read_dir('', u'.')
1438
self.assertEqual((name0, name0, 'directory'), entry[:3])
1439
self.assertEqual(u'./' + name0u, entry[4])
1440
self.assertStatIsCorrect(entry[4], entry[3])
1442
def assertPathCompare(self, path_less, path_greater):
1443
"""check that path_less and path_greater compare correctly."""
1444
self.assertEqual(0, osutils.compare_paths_prefix_order(
1445
path_less, path_less))
1446
self.assertEqual(0, osutils.compare_paths_prefix_order(
1447
path_greater, path_greater))
1448
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1449
path_less, path_greater))
1450
self.assertEqual(1, osutils.compare_paths_prefix_order(
1451
path_greater, path_less))
1453
def test_compare_paths_prefix_order(self):
1454
# root before all else
1455
self.assertPathCompare("/", "/a")
1456
# alpha within a dir
1457
self.assertPathCompare("/a", "/b")
1458
self.assertPathCompare("/b", "/z")
1459
# high dirs before lower.
1460
self.assertPathCompare("/z", "/a/a")
1461
# except if the deeper dir should be output first
1462
self.assertPathCompare("/a/b/c", "/d/g")
1463
# lexical betwen dirs of the same height
1464
self.assertPathCompare("/a/z", "/z/z")
1465
self.assertPathCompare("/a/c/z", "/a/d/e")
1467
# this should also be consistent for no leading / paths
1468
# root before all else
1469
self.assertPathCompare("", "a")
1470
# alpha within a dir
1471
self.assertPathCompare("a", "b")
1472
self.assertPathCompare("b", "z")
1473
# high dirs before lower.
1474
self.assertPathCompare("z", "a/a")
1475
# except if the deeper dir should be output first
1476
self.assertPathCompare("a/b/c", "d/g")
1477
# lexical betwen dirs of the same height
1478
self.assertPathCompare("a/z", "z/z")
1479
self.assertPathCompare("a/c/z", "a/d/e")
1481
def test_path_prefix_sorting(self):
1482
"""Doing a sort on path prefix should match our sample data."""
1497
dir_sorted_paths = [
1513
sorted(original_paths, key=osutils.path_prefix_key))
1514
# using the comparison routine shoudl work too:
1517
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1520
class TestCopyTree(tests.TestCaseInTempDir):
1522
def test_copy_basic_tree(self):
1523
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1524
osutils.copy_tree('source', 'target')
1525
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1526
self.assertEqual(['c'], os.listdir('target/b'))
1528
def test_copy_tree_target_exists(self):
1529
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1531
osutils.copy_tree('source', 'target')
1532
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1533
self.assertEqual(['c'], os.listdir('target/b'))
1535
def test_copy_tree_symlinks(self):
1536
self.requireFeature(tests.SymlinkFeature)
1537
self.build_tree(['source/'])
1538
os.symlink('a/generic/path', 'source/lnk')
1539
osutils.copy_tree('source', 'target')
1540
self.assertEqual(['lnk'], os.listdir('target'))
1541
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1543
def test_copy_tree_handlers(self):
1544
processed_files = []
1545
processed_links = []
1546
def file_handler(from_path, to_path):
1547
processed_files.append(('f', from_path, to_path))
1548
def dir_handler(from_path, to_path):
1549
processed_files.append(('d', from_path, to_path))
1550
def link_handler(from_path, to_path):
1551
processed_links.append((from_path, to_path))
1552
handlers = {'file':file_handler,
1553
'directory':dir_handler,
1554
'symlink':link_handler,
1557
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1558
if osutils.has_symlinks():
1559
os.symlink('a/generic/path', 'source/lnk')
1560
osutils.copy_tree('source', 'target', handlers=handlers)
1562
self.assertEqual([('d', 'source', 'target'),
1563
('f', 'source/a', 'target/a'),
1564
('d', 'source/b', 'target/b'),
1565
('f', 'source/b/c', 'target/b/c'),
1567
self.failIfExists('target')
1568
if osutils.has_symlinks():
1569
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1572
class TestSetUnsetEnv(tests.TestCase):
1573
"""Test updating the environment"""
1576
super(TestSetUnsetEnv, self).setUp()
1578
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1579
'Environment was not cleaned up properly.'
1580
' Variable BZR_TEST_ENV_VAR should not exist.')
1582
if 'BZR_TEST_ENV_VAR' in os.environ:
1583
del os.environ['BZR_TEST_ENV_VAR']
1585
self.addCleanup(cleanup)
1588
"""Test that we can set an env variable"""
1589
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1590
self.assertEqual(None, old)
1591
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1593
def test_double_set(self):
1594
"""Test that we get the old value out"""
1595
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1596
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1597
self.assertEqual('foo', old)
1598
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1600
def test_unicode(self):
1601
"""Environment can only contain plain strings
1603
So Unicode strings must be encoded.
1605
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1607
raise tests.TestSkipped(
1608
'Cannot find a unicode character that works in encoding %s'
1609
% (osutils.get_user_encoding(),))
1611
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1612
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1614
def test_unset(self):
1615
"""Test that passing None will remove the env var"""
1616
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1617
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1618
self.assertEqual('foo', old)
1619
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1620
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1623
class TestSizeShaFile(tests.TestCaseInTempDir):
1625
def test_sha_empty(self):
1626
self.build_tree_contents([('foo', '')])
1627
expected_sha = osutils.sha_string('')
1629
self.addCleanup(f.close)
1630
size, sha = osutils.size_sha_file(f)
1631
self.assertEqual(0, size)
1632
self.assertEqual(expected_sha, sha)
1634
def test_sha_mixed_endings(self):
1635
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1636
self.build_tree_contents([('foo', text)])
1637
expected_sha = osutils.sha_string(text)
1638
f = open('foo', 'rb')
1639
self.addCleanup(f.close)
1640
size, sha = osutils.size_sha_file(f)
1641
self.assertEqual(38, size)
1642
self.assertEqual(expected_sha, sha)
1645
class TestShaFileByName(tests.TestCaseInTempDir):
1647
def test_sha_empty(self):
1648
self.build_tree_contents([('foo', '')])
1649
expected_sha = osutils.sha_string('')
1650
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1652
def test_sha_mixed_endings(self):
1653
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1654
self.build_tree_contents([('foo', text)])
1655
expected_sha = osutils.sha_string(text)
1656
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1659
class TestResourceLoading(tests.TestCaseInTempDir):
1661
def test_resource_string(self):
1662
# test resource in bzrlib
1663
text = osutils.resource_string('bzrlib', 'debug.py')
1664
self.assertContainsRe(text, "debug_flags = set()")
1665
# test resource under bzrlib
1666
text = osutils.resource_string('bzrlib.ui', 'text.py')
1667
self.assertContainsRe(text, "class TextUIFactory")
1668
# test unsupported package
1669
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1671
# test unknown resource
1672
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1675
class TestReCompile(tests.TestCase):
1677
def test_re_compile_checked(self):
1678
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1679
self.assertTrue(r.match('aaaa'))
1680
self.assertTrue(r.match('aAaA'))
1682
def test_re_compile_checked_error(self):
1683
# like https://bugs.launchpad.net/bzr/+bug/251352
1684
err = self.assertRaises(
1685
errors.BzrCommandError,
1686
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1688
"Invalid regular expression in test case: '*': "
1689
"nothing to repeat",
1693
class TestDirReader(tests.TestCaseInTempDir):
1696
_dir_reader_class = None
1697
_native_to_unicode = None
1700
tests.TestCaseInTempDir.setUp(self)
1702
# Save platform specific info and reset it
1703
cur_dir_reader = osutils._selected_dir_reader
1706
osutils._selected_dir_reader = cur_dir_reader
1707
self.addCleanup(restore)
1709
osutils._selected_dir_reader = self._dir_reader_class()
1711
def _get_ascii_tree(self):
1719
expected_dirblocks = [
1721
[('0file', '0file', 'file'),
1722
('1dir', '1dir', 'directory'),
1723
('2file', '2file', 'file'),
1726
(('1dir', './1dir'),
1727
[('1dir/0file', '0file', 'file'),
1728
('1dir/1dir', '1dir', 'directory'),
1731
(('1dir/1dir', './1dir/1dir'),
1736
return tree, expected_dirblocks
1738
def test_walk_cur_dir(self):
1739
tree, expected_dirblocks = self._get_ascii_tree()
1740
self.build_tree(tree)
1741
result = list(osutils._walkdirs_utf8('.'))
1742
# Filter out stat and abspath
1743
self.assertEqual(expected_dirblocks,
1744
[(dirinfo, [line[0:3] for line in block])
1745
for dirinfo, block in result])
1747
def test_walk_sub_dir(self):
1748
tree, expected_dirblocks = self._get_ascii_tree()
1749
self.build_tree(tree)
1750
# you can search a subdir only, with a supplied prefix.
1751
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1752
# Filter out stat and abspath
1753
self.assertEqual(expected_dirblocks[1:],
1754
[(dirinfo, [line[0:3] for line in block])
1755
for dirinfo, block in result])
1757
def _get_unicode_tree(self):
1758
name0u = u'0file-\xb6'
1759
name1u = u'1dir-\u062c\u0648'
1760
name2u = u'2file-\u0633'
1764
name1u + '/' + name0u,
1765
name1u + '/' + name1u + '/',
1768
name0 = name0u.encode('UTF-8')
1769
name1 = name1u.encode('UTF-8')
1770
name2 = name2u.encode('UTF-8')
1771
expected_dirblocks = [
1773
[(name0, name0, 'file', './' + name0u),
1774
(name1, name1, 'directory', './' + name1u),
1775
(name2, name2, 'file', './' + name2u),
1778
((name1, './' + name1u),
1779
[(name1 + '/' + name0, name0, 'file', './' + name1u
1781
(name1 + '/' + name1, name1, 'directory', './' + name1u
1785
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1790
return tree, expected_dirblocks
1792
def _filter_out(self, raw_dirblocks):
1793
"""Filter out a walkdirs_utf8 result.
1795
stat field is removed, all native paths are converted to unicode
1797
filtered_dirblocks = []
1798
for dirinfo, block in raw_dirblocks:
1799
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1802
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1803
filtered_dirblocks.append((dirinfo, details))
1804
return filtered_dirblocks
1806
def test_walk_unicode_tree(self):
1807
self.requireFeature(tests.UnicodeFilenameFeature)
1808
tree, expected_dirblocks = self._get_unicode_tree()
1809
self.build_tree(tree)
1810
result = list(osutils._walkdirs_utf8('.'))
1811
self.assertEqual(expected_dirblocks, self._filter_out(result))
1813
def test_symlink(self):
1814
self.requireFeature(tests.SymlinkFeature)
1815
self.requireFeature(tests.UnicodeFilenameFeature)
1816
target = u'target\N{Euro Sign}'
1817
link_name = u'l\N{Euro Sign}nk'
1818
os.symlink(target, link_name)
1819
target_utf8 = target.encode('UTF-8')
1820
link_name_utf8 = link_name.encode('UTF-8')
1821
expected_dirblocks = [
1823
[(link_name_utf8, link_name_utf8,
1824
'symlink', './' + link_name),],
1826
result = list(osutils._walkdirs_utf8('.'))
1827
self.assertEqual(expected_dirblocks, self._filter_out(result))
1830
class TestReadLink(tests.TestCaseInTempDir):
1831
"""Exposes os.readlink() problems and the osutils solution.
1833
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1834
unicode string will be returned if a unicode string is passed.
1836
But prior python versions failed to properly encode the passed unicode
1839
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1842
super(tests.TestCaseInTempDir, self).setUp()
1843
self.link = u'l\N{Euro Sign}ink'
1844
self.target = u'targe\N{Euro Sign}t'
1845
os.symlink(self.target, self.link)
1847
def test_os_readlink_link_encoding(self):
1848
if sys.version_info < (2, 6):
1849
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1851
self.assertEquals(self.target, os.readlink(self.link))
1853
def test_os_readlink_link_decoding(self):
1854
self.assertEquals(self.target.encode(osutils._fs_enc),
1855
os.readlink(self.link.encode(osutils._fs_enc)))
1858
class TestConcurrency(tests.TestCase):
1861
super(TestConcurrency, self).setUp()
1862
orig = osutils._cached_local_concurrency
1864
osutils._cached_local_concurrency = orig
1865
self.addCleanup(restore)
1867
def test_local_concurrency(self):
1868
concurrency = osutils.local_concurrency()
1869
self.assertIsInstance(concurrency, int)
1871
def test_local_concurrency_environment_variable(self):
1872
os.environ['BZR_CONCURRENCY'] = '2'
1873
self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1874
os.environ['BZR_CONCURRENCY'] = '3'
1875
self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1876
os.environ['BZR_CONCURRENCY'] = 'foo'
1877
self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1879
def test_option_concurrency(self):
1880
os.environ['BZR_CONCURRENCY'] = '1'
1881
self.run_bzr('rocks --concurrency 42')
1882
# Command line overrides envrionment variable
1883
self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1884
self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1887
class TestFailedToLoadExtension(tests.TestCase):
1889
def _try_loading(self):
1891
import bzrlib._fictional_extension_py
1892
except ImportError, e:
1893
osutils.failed_to_load_extension(e)
1897
super(TestFailedToLoadExtension, self).setUp()
1898
self.saved_failures = osutils._extension_load_failures[:]
1899
del osutils._extension_load_failures[:]
1900
self.addCleanup(self.restore_failures)
1902
def restore_failures(self):
1903
osutils._extension_load_failures = self.saved_failures
1905
def test_failure_to_load(self):
1907
self.assertLength(1, osutils._extension_load_failures)
1908
self.assertEquals(osutils._extension_load_failures[0],
1909
"No module named _fictional_extension_py")
1911
def test_report_extension_load_failures_no_warning(self):
1912
self.assertTrue(self._try_loading())
1913
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1914
# it used to give a Python warning; it no longer does
1915
self.assertLength(0, warnings)
1917
def test_report_extension_load_failures_message(self):
1919
trace.push_log_file(log)
1920
self.assertTrue(self._try_loading())
1921
osutils.report_extension_load_failures()
1922
self.assertContainsRe(
1924
r"bzr: warning: some compiled extensions could not be loaded; "
1925
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1929
class TestTerminalWidth(tests.TestCase):
1931
def replace_stdout(self, new):
1932
orig_stdout = sys.stdout
1934
sys.stdout = orig_stdout
1935
self.addCleanup(restore)
1938
def replace__terminal_size(self, new):
1939
orig__terminal_size = osutils._terminal_size
1941
osutils._terminal_size = orig__terminal_size
1942
self.addCleanup(restore)
1943
osutils._terminal_size = new
1945
def set_fake_tty(self):
1947
class I_am_a_tty(object):
1951
self.replace_stdout(I_am_a_tty())
1953
def test_default_values(self):
1954
self.assertEqual(80, osutils.default_terminal_width)
1956
def test_defaults_to_BZR_COLUMNS(self):
1957
# BZR_COLUMNS is set by the test framework
1958
self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1959
os.environ['BZR_COLUMNS'] = '12'
1960
self.assertEqual(12, osutils.terminal_width())
1962
def test_falls_back_to_COLUMNS(self):
1963
del os.environ['BZR_COLUMNS']
1964
self.assertNotEqual('42', os.environ['COLUMNS'])
1966
os.environ['COLUMNS'] = '42'
1967
self.assertEqual(42, osutils.terminal_width())
1969
def test_tty_default_without_columns(self):
1970
del os.environ['BZR_COLUMNS']
1971
del os.environ['COLUMNS']
1973
def terminal_size(w, h):
1977
# We need to override the osutils definition as it depends on the
1978
# running environment that we can't control (PQM running without a
1979
# controlling terminal is one example).
1980
self.replace__terminal_size(terminal_size)
1981
self.assertEqual(42, osutils.terminal_width())
1983
def test_non_tty_default_without_columns(self):
1984
del os.environ['BZR_COLUMNS']
1985
del os.environ['COLUMNS']
1986
self.replace_stdout(None)
1987
self.assertEqual(None, osutils.terminal_width())
1989
def test_no_TIOCGWINSZ(self):
1990
self.requireFeature(term_ios_feature)
1991
termios = term_ios_feature.module
1992
# bug 63539 is about a termios without TIOCGWINSZ attribute
1994
orig = termios.TIOCGWINSZ
1995
except AttributeError:
1996
# We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2000
termios.TIOCGWINSZ = orig
2001
self.addCleanup(restore)
2002
del termios.TIOCGWINSZ
2003
del os.environ['BZR_COLUMNS']
2004
del os.environ['COLUMNS']
2005
# Whatever the result is, if we don't raise an exception, it's ok.
2006
osutils.terminal_width()