1
# Copyright (C) 2005-2010 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
35
from bzrlib.tests import (
41
class _UTF8DirReaderFeature(tests.Feature):
45
from bzrlib import _readdir_pyx
46
self.reader = _readdir_pyx.UTF8DirReader
51
def feature_name(self):
52
return 'bzrlib._readdir_pyx'
54
UTF8DirReaderFeature = _UTF8DirReaderFeature()
56
term_ios_feature = tests.ModuleAvailableFeature('termios')
59
def _already_unicode(s):
63
def _utf8_to_unicode(s):
64
return s.decode('UTF-8')
67
def dir_reader_scenarios():
68
# For each dir reader we define:
70
# - native_to_unicode: a function converting the native_abspath as returned
71
# by DirReader.read_dir to its unicode representation
73
# UnicodeDirReader is the fallback, it should be tested on all platforms.
74
scenarios = [('unicode',
75
dict(_dir_reader_class=osutils.UnicodeDirReader,
76
_native_to_unicode=_already_unicode))]
77
# Some DirReaders are platform specific and even there they may not be
79
if UTF8DirReaderFeature.available():
80
from bzrlib import _readdir_pyx
81
scenarios.append(('utf8',
82
dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
83
_native_to_unicode=_utf8_to_unicode)))
85
if test__walkdirs_win32.win32_readdir_feature.available():
87
from bzrlib import _walkdirs_win32
90
dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
91
_native_to_unicode=_already_unicode)))
97
def load_tests(basic_tests, module, loader):
98
suite = loader.suiteClass()
99
dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
100
basic_tests, tests.condition_isinstance(TestDirReader))
101
tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
102
suite.addTest(remaining_tests)
106
class TestContainsWhitespace(tests.TestCase):
108
def test_contains_whitespace(self):
109
self.failUnless(osutils.contains_whitespace(u' '))
110
self.failUnless(osutils.contains_whitespace(u'hello there'))
111
self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
112
self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
113
self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
114
self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
116
# \xa0 is "Non-breaking-space" which on some python locales thinks it
117
# is whitespace, but we do not.
118
self.failIf(osutils.contains_whitespace(u''))
119
self.failIf(osutils.contains_whitespace(u'hellothere'))
120
self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
123
class TestRename(tests.TestCaseInTempDir):
125
def create_file(self, filename, content):
126
f = open(filename, 'wb')
132
def _fancy_rename(self, a, b):
133
osutils.fancy_rename(a, b, rename_func=os.rename,
134
unlink_func=os.unlink)
136
def test_fancy_rename(self):
137
# This should work everywhere
138
self.create_file('a', 'something in a\n')
139
self._fancy_rename('a', 'b')
140
self.failIfExists('a')
141
self.failUnlessExists('b')
142
self.check_file_contents('b', 'something in a\n')
144
self.create_file('a', 'new something in a\n')
145
self._fancy_rename('b', 'a')
147
self.check_file_contents('a', 'something in a\n')
149
def test_fancy_rename_fails_source_missing(self):
150
# An exception should be raised, and the target should be left in place
151
self.create_file('target', 'data in target\n')
152
self.assertRaises((IOError, OSError), self._fancy_rename,
153
'missingsource', 'target')
154
self.failUnlessExists('target')
155
self.check_file_contents('target', 'data in target\n')
157
def test_fancy_rename_fails_if_source_and_target_missing(self):
158
self.assertRaises((IOError, OSError), self._fancy_rename,
159
'missingsource', 'missingtarget')
161
def test_rename(self):
162
# Rename should be semi-atomic on all platforms
163
self.create_file('a', 'something in a\n')
164
osutils.rename('a', 'b')
165
self.failIfExists('a')
166
self.failUnlessExists('b')
167
self.check_file_contents('b', 'something in a\n')
169
self.create_file('a', 'new something in a\n')
170
osutils.rename('b', 'a')
172
self.check_file_contents('a', 'something in a\n')
174
# TODO: test fancy_rename using a MemoryTransport
176
def test_rename_change_case(self):
177
# on Windows we should be able to change filename case by rename
178
self.build_tree(['a', 'b/'])
179
osutils.rename('a', 'A')
180
osutils.rename('b', 'B')
181
# we can't use failUnlessExists on case-insensitive filesystem
182
# so try to check shape of the tree
183
shape = sorted(os.listdir('.'))
184
self.assertEquals(['A', 'B'], shape)
187
class TestRandChars(tests.TestCase):
189
def test_01_rand_chars_empty(self):
190
result = osutils.rand_chars(0)
191
self.assertEqual(result, '')
193
def test_02_rand_chars_100(self):
194
result = osutils.rand_chars(100)
195
self.assertEqual(len(result), 100)
196
self.assertEqual(type(result), str)
197
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
200
class TestIsInside(tests.TestCase):
202
def test_is_inside(self):
203
is_inside = osutils.is_inside
204
self.assertTrue(is_inside('src', 'src/foo.c'))
205
self.assertFalse(is_inside('src', 'srccontrol'))
206
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
207
self.assertTrue(is_inside('foo.c', 'foo.c'))
208
self.assertFalse(is_inside('foo.c', ''))
209
self.assertTrue(is_inside('', 'foo.c'))
211
def test_is_inside_any(self):
212
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
213
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
214
(['src'], SRC_FOO_C),
217
self.assert_(osutils.is_inside_any(dirs, fn))
218
for dirs, fn in [(['src'], 'srccontrol'),
219
(['src'], 'srccontrol/foo')]:
220
self.assertFalse(osutils.is_inside_any(dirs, fn))
222
def test_is_inside_or_parent_of_any(self):
223
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
224
(['src'], 'src/foo.c'),
225
(['src/bar.c'], 'src'),
226
(['src/bar.c', 'bla/foo.c'], 'src'),
229
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
231
for dirs, fn in [(['src'], 'srccontrol'),
232
(['srccontrol/foo.c'], 'src'),
233
(['src'], 'srccontrol/foo')]:
234
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
237
class TestRmTree(tests.TestCaseInTempDir):
239
def test_rmtree(self):
240
# Check to remove tree with read-only files/dirs
242
f = file('dir/file', 'w')
245
# would like to also try making the directory readonly, but at the
246
# moment python shutil.rmtree doesn't handle that properly - it would
247
# need to chmod the directory before removing things inside it - deferred
248
# for now -- mbp 20060505
249
# osutils.make_readonly('dir')
250
osutils.make_readonly('dir/file')
252
osutils.rmtree('dir')
254
self.failIfExists('dir/file')
255
self.failIfExists('dir')
258
class TestDeleteAny(tests.TestCaseInTempDir):
260
def test_delete_any_readonly(self):
261
# from <https://bugs.launchpad.net/bzr/+bug/218206>
262
self.build_tree(['d/', 'f'])
263
osutils.make_readonly('d')
264
osutils.make_readonly('f')
266
osutils.delete_any('f')
267
osutils.delete_any('d')
270
class TestKind(tests.TestCaseInTempDir):
272
def test_file_kind(self):
273
self.build_tree(['file', 'dir/'])
274
self.assertEquals('file', osutils.file_kind('file'))
275
self.assertEquals('directory', osutils.file_kind('dir/'))
276
if osutils.has_symlinks():
277
os.symlink('symlink', 'symlink')
278
self.assertEquals('symlink', osutils.file_kind('symlink'))
280
# TODO: jam 20060529 Test a block device
282
os.lstat('/dev/null')
284
if e.errno not in (errno.ENOENT,):
287
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
289
mkfifo = getattr(os, 'mkfifo', None)
293
self.assertEquals('fifo', osutils.file_kind('fifo'))
297
AF_UNIX = getattr(socket, 'AF_UNIX', None)
299
s = socket.socket(AF_UNIX)
302
self.assertEquals('socket', osutils.file_kind('socket'))
306
def test_kind_marker(self):
307
self.assertEqual("", osutils.kind_marker("file"))
308
self.assertEqual("/", osutils.kind_marker('directory'))
309
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
310
self.assertEqual("@", osutils.kind_marker("symlink"))
311
self.assertEqual("+", osutils.kind_marker("tree-reference"))
312
self.assertEqual("", osutils.kind_marker("fifo"))
313
self.assertEqual("", osutils.kind_marker("socket"))
314
self.assertEqual("", osutils.kind_marker("unknown"))
317
class TestUmask(tests.TestCaseInTempDir):
319
def test_get_umask(self):
320
if sys.platform == 'win32':
321
# umask always returns '0', no way to set it
322
self.assertEqual(0, osutils.get_umask())
325
orig_umask = osutils.get_umask()
326
self.addCleanup(os.umask, orig_umask)
328
self.assertEqual(0222, osutils.get_umask())
330
self.assertEqual(0022, osutils.get_umask())
332
self.assertEqual(0002, osutils.get_umask())
334
self.assertEqual(0027, osutils.get_umask())
337
class TestDateTime(tests.TestCase):
339
def assertFormatedDelta(self, expected, seconds):
340
"""Assert osutils.format_delta formats as expected"""
341
actual = osutils.format_delta(seconds)
342
self.assertEqual(expected, actual)
344
def test_format_delta(self):
345
self.assertFormatedDelta('0 seconds ago', 0)
346
self.assertFormatedDelta('1 second ago', 1)
347
self.assertFormatedDelta('10 seconds ago', 10)
348
self.assertFormatedDelta('59 seconds ago', 59)
349
self.assertFormatedDelta('89 seconds ago', 89)
350
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
351
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
352
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
353
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
354
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
355
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
356
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
357
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
358
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
359
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
360
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
361
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
362
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
363
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
364
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
365
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
366
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
367
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
369
# We handle when time steps the wrong direction because computers
370
# don't have synchronized clocks.
371
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
372
self.assertFormatedDelta('1 second in the future', -1)
373
self.assertFormatedDelta('2 seconds in the future', -2)
375
def test_format_date(self):
376
self.assertRaises(errors.UnsupportedTimezoneFormat,
377
osutils.format_date, 0, timezone='foo')
378
self.assertIsInstance(osutils.format_date(0), str)
379
self.assertIsInstance(osutils.format_local_date(0), unicode)
380
# Testing for the actual value of the local weekday without
381
# duplicating the code from format_date is difficult.
382
# Instead blackbox.test_locale should check for localized
383
# dates once they do occur in output strings.
385
def test_format_date_with_offset_in_original_timezone(self):
386
self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
387
osutils.format_date_with_offset_in_original_timezone(0))
388
self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
389
osutils.format_date_with_offset_in_original_timezone(100000))
390
self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
391
osutils.format_date_with_offset_in_original_timezone(100000, 7200))
393
def test_local_time_offset(self):
394
"""Test that local_time_offset() returns a sane value."""
395
offset = osutils.local_time_offset()
396
self.assertTrue(isinstance(offset, int))
397
# Test that the offset is no more than a eighteen hours in
399
# Time zone handling is system specific, so it is difficult to
400
# do more specific tests, but a value outside of this range is
402
eighteen_hours = 18 * 3600
403
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
405
def test_local_time_offset_with_timestamp(self):
406
"""Test that local_time_offset() works with a timestamp."""
407
offset = osutils.local_time_offset(1000000000.1234567)
408
self.assertTrue(isinstance(offset, int))
409
eighteen_hours = 18 * 3600
410
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
413
class TestLinks(tests.TestCaseInTempDir):
415
def test_dereference_path(self):
416
self.requireFeature(tests.SymlinkFeature)
417
cwd = osutils.realpath('.')
419
bar_path = osutils.pathjoin(cwd, 'bar')
420
# Using './' to avoid bug #1213894 (first path component not
421
# dereferenced) in Python 2.4.1 and earlier
422
self.assertEqual(bar_path, osutils.realpath('./bar'))
423
os.symlink('bar', 'foo')
424
self.assertEqual(bar_path, osutils.realpath('./foo'))
426
# Does not dereference terminal symlinks
427
foo_path = osutils.pathjoin(cwd, 'foo')
428
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
430
# Dereferences parent symlinks
432
baz_path = osutils.pathjoin(bar_path, 'baz')
433
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
435
# Dereferences parent symlinks that are the first path element
436
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
438
# Dereferences parent symlinks in absolute paths
439
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
440
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
442
def test_changing_access(self):
443
f = file('file', 'w')
447
# Make a file readonly
448
osutils.make_readonly('file')
449
mode = os.lstat('file').st_mode
450
self.assertEqual(mode, mode & 0777555)
452
# Make a file writable
453
osutils.make_writable('file')
454
mode = os.lstat('file').st_mode
455
self.assertEqual(mode, mode | 0200)
457
if osutils.has_symlinks():
458
# should not error when handed a symlink
459
os.symlink('nonexistent', 'dangling')
460
osutils.make_readonly('dangling')
461
osutils.make_writable('dangling')
463
def test_host_os_dereferences_symlinks(self):
464
osutils.host_os_dereferences_symlinks()
467
class TestCanonicalRelPath(tests.TestCaseInTempDir):
469
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
471
def test_canonical_relpath_simple(self):
472
f = file('MixedCaseName', 'w')
474
actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
475
self.failUnlessEqual('work/MixedCaseName', actual)
477
def test_canonical_relpath_missing_tail(self):
478
os.mkdir('MixedCaseParent')
479
actual = osutils.canonical_relpath(self.test_base_dir,
480
'mixedcaseparent/nochild')
481
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
484
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
486
def assertRelpath(self, expected, base, path):
487
actual = osutils._cicp_canonical_relpath(base, path)
488
self.assertEqual(expected, actual)
490
def test_simple(self):
491
self.build_tree(['MixedCaseName'])
492
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
493
self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
495
def test_subdir_missing_tail(self):
496
self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
497
base = osutils.realpath(self.get_transport('.').local_abspath('.'))
498
self.assertRelpath('MixedCaseParent/a_child', base,
499
'MixedCaseParent/a_child')
500
self.assertRelpath('MixedCaseParent/a_child', base,
501
'MixedCaseParent/A_Child')
502
self.assertRelpath('MixedCaseParent/not_child', base,
503
'MixedCaseParent/not_child')
505
def test_at_root_slash(self):
506
# We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
508
if osutils.MIN_ABS_PATHLENGTH > 1:
509
raise tests.TestSkipped('relpath requires %d chars'
510
% osutils.MIN_ABS_PATHLENGTH)
511
self.assertRelpath('foo', '/', '/foo')
513
def test_at_root_drive(self):
514
if sys.platform != 'win32':
515
raise tests.TestNotApplicable('we can only test drive-letter relative'
516
' paths on Windows where we have drive'
519
# The specific issue is that when at the root of a drive, 'abspath'
520
# returns "C:/" or just "/". However, the code assumes that abspath
521
# always returns something like "C:/foo" or "/foo" (no trailing slash).
522
self.assertRelpath('foo', 'C:/', 'C:/foo')
523
self.assertRelpath('foo', 'X:/', 'X:/foo')
524
self.assertRelpath('foo', 'X:/', 'X://foo')
527
class TestPumpFile(tests.TestCase):
528
"""Test pumpfile method."""
531
tests.TestCase.setUp(self)
532
# create a test datablock
533
self.block_size = 512
534
pattern = '0123456789ABCDEF'
535
self.test_data = pattern * (3 * self.block_size / len(pattern))
536
self.test_data_len = len(self.test_data)
538
def test_bracket_block_size(self):
539
"""Read data in blocks with the requested read size bracketing the
541
# make sure test data is larger than max read size
542
self.assertTrue(self.test_data_len > self.block_size)
544
from_file = file_utils.FakeReadFile(self.test_data)
547
# read (max / 2) bytes and verify read size wasn't affected
548
num_bytes_to_read = self.block_size / 2
549
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
550
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
551
self.assertEqual(from_file.get_read_count(), 1)
553
# read (max) bytes and verify read size wasn't affected
554
num_bytes_to_read = self.block_size
555
from_file.reset_read_count()
556
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
557
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
558
self.assertEqual(from_file.get_read_count(), 1)
560
# read (max + 1) bytes and verify read size was limited
561
num_bytes_to_read = self.block_size + 1
562
from_file.reset_read_count()
563
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
564
self.assertEqual(from_file.get_max_read_size(), self.block_size)
565
self.assertEqual(from_file.get_read_count(), 2)
567
# finish reading the rest of the data
568
num_bytes_to_read = self.test_data_len - to_file.tell()
569
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
571
# report error if the data wasn't equal (we only report the size due
572
# to the length of the data)
573
response_data = to_file.getvalue()
574
if response_data != self.test_data:
575
message = "Data not equal. Expected %d bytes, received %d."
576
self.fail(message % (len(response_data), self.test_data_len))
578
def test_specified_size(self):
579
"""Request a transfer larger than the maximum block size and verify
580
that the maximum read doesn't exceed the block_size."""
581
# make sure test data is larger than max read size
582
self.assertTrue(self.test_data_len > self.block_size)
584
# retrieve data in blocks
585
from_file = file_utils.FakeReadFile(self.test_data)
587
osutils.pumpfile(from_file, to_file, self.test_data_len,
590
# verify read size was equal to the maximum read size
591
self.assertTrue(from_file.get_max_read_size() > 0)
592
self.assertEqual(from_file.get_max_read_size(), self.block_size)
593
self.assertEqual(from_file.get_read_count(), 3)
595
# report error if the data wasn't equal (we only report the size due
596
# to the length of the data)
597
response_data = to_file.getvalue()
598
if response_data != self.test_data:
599
message = "Data not equal. Expected %d bytes, received %d."
600
self.fail(message % (len(response_data), self.test_data_len))
602
def test_to_eof(self):
603
"""Read to end-of-file and verify that the reads are not larger than
604
the maximum read size."""
605
# make sure test data is larger than max read size
606
self.assertTrue(self.test_data_len > self.block_size)
608
# retrieve data to EOF
609
from_file = file_utils.FakeReadFile(self.test_data)
611
osutils.pumpfile(from_file, to_file, -1, self.block_size)
613
# verify read size was equal to the maximum read size
614
self.assertEqual(from_file.get_max_read_size(), self.block_size)
615
self.assertEqual(from_file.get_read_count(), 4)
617
# report error if the data wasn't equal (we only report the size due
618
# to the length of the data)
619
response_data = to_file.getvalue()
620
if response_data != self.test_data:
621
message = "Data not equal. Expected %d bytes, received %d."
622
self.fail(message % (len(response_data), self.test_data_len))
624
def test_defaults(self):
625
"""Verifies that the default arguments will read to EOF -- this
626
test verifies that any existing usages of pumpfile will not be broken
627
with this new version."""
628
# retrieve data using default (old) pumpfile method
629
from_file = file_utils.FakeReadFile(self.test_data)
631
osutils.pumpfile(from_file, to_file)
633
# report error if the data wasn't equal (we only report the size due
634
# to the length of the data)
635
response_data = to_file.getvalue()
636
if response_data != self.test_data:
637
message = "Data not equal. Expected %d bytes, received %d."
638
self.fail(message % (len(response_data), self.test_data_len))
640
def test_report_activity(self):
642
def log_activity(length, direction):
643
activity.append((length, direction))
644
from_file = StringIO(self.test_data)
646
osutils.pumpfile(from_file, to_file, buff_size=500,
647
report_activity=log_activity, direction='read')
648
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
649
(36, 'read')], activity)
651
from_file = StringIO(self.test_data)
654
osutils.pumpfile(from_file, to_file, buff_size=500,
655
report_activity=log_activity, direction='write')
656
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
657
(36, 'write')], activity)
659
# And with a limited amount of data
660
from_file = StringIO(self.test_data)
663
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
664
report_activity=log_activity, direction='read')
665
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
669
class TestPumpStringFile(tests.TestCase):
671
def test_empty(self):
673
osutils.pump_string_file("", output)
674
self.assertEqual("", output.getvalue())
676
def test_more_than_segment_size(self):
678
osutils.pump_string_file("123456789", output, 2)
679
self.assertEqual("123456789", output.getvalue())
681
def test_segment_size(self):
683
osutils.pump_string_file("12", output, 2)
684
self.assertEqual("12", output.getvalue())
686
def test_segment_size_multiple(self):
688
osutils.pump_string_file("1234", output, 2)
689
self.assertEqual("1234", output.getvalue())
692
class TestRelpath(tests.TestCase):
694
def test_simple_relpath(self):
695
cwd = osutils.getcwd()
696
subdir = cwd + '/subdir'
697
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
699
def test_deep_relpath(self):
700
cwd = osutils.getcwd()
701
subdir = cwd + '/sub/subsubdir'
702
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
704
def test_not_relative(self):
705
self.assertRaises(errors.PathNotChild,
706
osutils.relpath, 'C:/path', 'H:/path')
707
self.assertRaises(errors.PathNotChild,
708
osutils.relpath, 'C:/', 'H:/path')
711
class TestSafeUnicode(tests.TestCase):
713
def test_from_ascii_string(self):
714
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
716
def test_from_unicode_string_ascii_contents(self):
717
self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
719
def test_from_unicode_string_unicode_contents(self):
720
self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
722
def test_from_utf8_string(self):
723
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
725
def test_bad_utf8_string(self):
726
self.assertRaises(errors.BzrBadParameterNotUnicode,
727
osutils.safe_unicode,
731
class TestSafeUtf8(tests.TestCase):
733
def test_from_ascii_string(self):
735
self.assertEqual('foobar', osutils.safe_utf8(f))
737
def test_from_unicode_string_ascii_contents(self):
738
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
740
def test_from_unicode_string_unicode_contents(self):
741
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
743
def test_from_utf8_string(self):
744
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
746
def test_bad_utf8_string(self):
747
self.assertRaises(errors.BzrBadParameterNotUnicode,
748
osutils.safe_utf8, '\xbb\xbb')
751
class TestSafeRevisionId(tests.TestCase):
753
def test_from_ascii_string(self):
754
# this shouldn't give a warning because it's getting an ascii string
755
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
757
def test_from_unicode_string_ascii_contents(self):
758
self.assertEqual('bargam',
759
osutils.safe_revision_id(u'bargam', warn=False))
761
def test_from_unicode_deprecated(self):
762
self.assertEqual('bargam',
763
self.callDeprecated([osutils._revision_id_warning],
764
osutils.safe_revision_id, u'bargam'))
766
def test_from_unicode_string_unicode_contents(self):
767
self.assertEqual('bargam\xc2\xae',
768
osutils.safe_revision_id(u'bargam\xae', warn=False))
770
def test_from_utf8_string(self):
771
self.assertEqual('foo\xc2\xae',
772
osutils.safe_revision_id('foo\xc2\xae'))
775
"""Currently, None is a valid revision_id"""
776
self.assertEqual(None, osutils.safe_revision_id(None))
779
class TestSafeFileId(tests.TestCase):
781
def test_from_ascii_string(self):
782
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
784
def test_from_unicode_string_ascii_contents(self):
785
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
787
def test_from_unicode_deprecated(self):
788
self.assertEqual('bargam',
789
self.callDeprecated([osutils._file_id_warning],
790
osutils.safe_file_id, u'bargam'))
792
def test_from_unicode_string_unicode_contents(self):
793
self.assertEqual('bargam\xc2\xae',
794
osutils.safe_file_id(u'bargam\xae', warn=False))
796
def test_from_utf8_string(self):
797
self.assertEqual('foo\xc2\xae',
798
osutils.safe_file_id('foo\xc2\xae'))
801
"""Currently, None is a valid revision_id"""
802
self.assertEqual(None, osutils.safe_file_id(None))
805
class TestWin32Funcs(tests.TestCase):
806
"""Test that _win32 versions of os utilities return appropriate paths."""
808
def test_abspath(self):
809
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
810
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
811
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
812
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
814
def test_realpath(self):
815
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
816
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
818
def test_pathjoin(self):
819
self.assertEqual('path/to/foo',
820
osutils._win32_pathjoin('path', 'to', 'foo'))
821
self.assertEqual('C:/foo',
822
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
823
self.assertEqual('C:/foo',
824
osutils._win32_pathjoin('path/to', 'C:/foo'))
825
self.assertEqual('path/to/foo',
826
osutils._win32_pathjoin('path/to/', 'foo'))
827
self.assertEqual('/foo',
828
osutils._win32_pathjoin('C:/path/to/', '/foo'))
829
self.assertEqual('/foo',
830
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
832
def test_normpath(self):
833
self.assertEqual('path/to/foo',
834
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
835
self.assertEqual('path/to/foo',
836
osutils._win32_normpath('path//from/../to/./foo'))
838
def test_getcwd(self):
839
cwd = osutils._win32_getcwd()
840
os_cwd = os.getcwdu()
841
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
842
# win32 is inconsistent whether it returns lower or upper case
843
# and even if it was consistent the user might type the other
844
# so we force it to uppercase
845
# running python.exe under cmd.exe return capital C:\\
846
# running win32 python inside a cygwin shell returns lowercase
847
self.assertEqual(os_cwd[0].upper(), cwd[0])
849
def test_fixdrive(self):
850
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
851
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
852
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
854
def test_win98_abspath(self):
856
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
857
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
859
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
860
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
862
cwd = osutils.getcwd().rstrip('/')
863
drive = osutils._nt_splitdrive(cwd)[0]
864
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
865
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
868
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
871
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
872
"""Test win32 functions that create files."""
874
def test_getcwd(self):
875
self.requireFeature(tests.UnicodeFilenameFeature)
878
# TODO: jam 20060427 This will probably fail on Mac OSX because
879
# it will change the normalization of B\xe5gfors
880
# Consider using a different unicode character, or make
881
# osutils.getcwd() renormalize the path.
882
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
884
def test_minimum_path_selection(self):
885
self.assertEqual(set(),
886
osutils.minimum_path_selection([]))
887
self.assertEqual(set(['a']),
888
osutils.minimum_path_selection(['a']))
889
self.assertEqual(set(['a', 'b']),
890
osutils.minimum_path_selection(['a', 'b']))
891
self.assertEqual(set(['a/', 'b']),
892
osutils.minimum_path_selection(['a/', 'b']))
893
self.assertEqual(set(['a/', 'b']),
894
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
895
self.assertEqual(set(['a-b', 'a', 'a0b']),
896
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
898
def test_mkdtemp(self):
899
tmpdir = osutils._win32_mkdtemp(dir='.')
900
self.assertFalse('\\' in tmpdir)
902
def test_rename(self):
910
osutils._win32_rename('b', 'a')
911
self.failUnlessExists('a')
912
self.failIfExists('b')
913
self.assertFileEqual('baz\n', 'a')
915
def test_rename_missing_file(self):
921
osutils._win32_rename('b', 'a')
922
except (IOError, OSError), e:
923
self.assertEqual(errno.ENOENT, e.errno)
924
self.assertFileEqual('foo\n', 'a')
926
def test_rename_missing_dir(self):
929
osutils._win32_rename('b', 'a')
930
except (IOError, OSError), e:
931
self.assertEqual(errno.ENOENT, e.errno)
933
def test_rename_current_dir(self):
936
# You can't rename the working directory
937
# doing rename non-existant . usually
938
# just raises ENOENT, since non-existant
941
osutils._win32_rename('b', '.')
942
except (IOError, OSError), e:
943
self.assertEqual(errno.ENOENT, e.errno)
945
def test_splitpath(self):
946
def check(expected, path):
947
self.assertEqual(expected, osutils.splitpath(path))
950
check(['a', 'b'], 'a/b')
951
check(['a', 'b'], 'a/./b')
952
check(['a', '.b'], 'a/.b')
953
check(['a', '.b'], 'a\\.b')
955
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
958
class TestParentDirectories(tests.TestCaseInTempDir):
959
"""Test osutils.parent_directories()"""
961
def test_parent_directories(self):
962
self.assertEqual([], osutils.parent_directories('a'))
963
self.assertEqual(['a'], osutils.parent_directories('a/b'))
964
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
967
class TestMacFuncsDirs(tests.TestCaseInTempDir):
968
"""Test mac special functions that require directories."""
970
def test_getcwd(self):
971
self.requireFeature(tests.UnicodeFilenameFeature)
972
os.mkdir(u'B\xe5gfors')
973
os.chdir(u'B\xe5gfors')
974
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
976
def test_getcwd_nonnorm(self):
977
self.requireFeature(tests.UnicodeFilenameFeature)
978
# Test that _mac_getcwd() will normalize this path
979
os.mkdir(u'Ba\u030agfors')
980
os.chdir(u'Ba\u030agfors')
981
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
984
class TestChunksToLines(tests.TestCase):
986
def test_smoketest(self):
987
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
988
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
989
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
990
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
992
def test_osutils_binding(self):
993
from bzrlib.tests import test__chunks_to_lines
994
if test__chunks_to_lines.compiled_chunkstolines_feature.available():
995
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
997
from bzrlib._chunks_to_lines_py import chunks_to_lines
998
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
1001
class TestSplitLines(tests.TestCase):
1003
def test_split_unicode(self):
1004
self.assertEqual([u'foo\n', u'bar\xae'],
1005
osutils.split_lines(u'foo\nbar\xae'))
1006
self.assertEqual([u'foo\n', u'bar\xae\n'],
1007
osutils.split_lines(u'foo\nbar\xae\n'))
1009
def test_split_with_carriage_returns(self):
1010
self.assertEqual(['foo\rbar\n'],
1011
osutils.split_lines('foo\rbar\n'))
1014
class TestWalkDirs(tests.TestCaseInTempDir):
1016
def assertExpectedBlocks(self, expected, result):
1017
self.assertEqual(expected,
1018
[(dirinfo, [line[0:3] for line in block])
1019
for dirinfo, block in result])
1021
def test_walkdirs(self):
1030
self.build_tree(tree)
1031
expected_dirblocks = [
1033
[('0file', '0file', 'file'),
1034
('1dir', '1dir', 'directory'),
1035
('2file', '2file', 'file'),
1038
(('1dir', './1dir'),
1039
[('1dir/0file', '0file', 'file'),
1040
('1dir/1dir', '1dir', 'directory'),
1043
(('1dir/1dir', './1dir/1dir'),
1049
found_bzrdir = False
1050
for dirdetail, dirblock in osutils.walkdirs('.'):
1051
if len(dirblock) and dirblock[0][1] == '.bzr':
1052
# this tests the filtering of selected paths
1055
result.append((dirdetail, dirblock))
1057
self.assertTrue(found_bzrdir)
1058
self.assertExpectedBlocks(expected_dirblocks, result)
1059
# you can search a subdir only, with a supplied prefix.
1061
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1062
result.append(dirblock)
1063
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1065
def test_walkdirs_os_error(self):
1066
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1067
# Pyrex readdir didn't raise useful messages if it had an error
1068
# reading the directory
1069
if sys.platform == 'win32':
1070
raise tests.TestNotApplicable(
1071
"readdir IOError not tested on win32")
1072
os.mkdir("test-unreadable")
1073
os.chmod("test-unreadable", 0000)
1074
# must chmod it back so that it can be removed
1075
self.addCleanup(os.chmod, "test-unreadable", 0700)
1076
# The error is not raised until the generator is actually evaluated.
1077
# (It would be ok if it happened earlier but at the moment it
1079
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1080
self.assertEquals('./test-unreadable', e.filename)
1081
self.assertEquals(errno.EACCES, e.errno)
1082
# Ensure the message contains the file name
1083
self.assertContainsRe(str(e), "\./test-unreadable")
1085
def test__walkdirs_utf8(self):
1094
self.build_tree(tree)
1095
expected_dirblocks = [
1097
[('0file', '0file', 'file'),
1098
('1dir', '1dir', 'directory'),
1099
('2file', '2file', 'file'),
1102
(('1dir', './1dir'),
1103
[('1dir/0file', '0file', 'file'),
1104
('1dir/1dir', '1dir', 'directory'),
1107
(('1dir/1dir', './1dir/1dir'),
1113
found_bzrdir = False
1114
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1115
if len(dirblock) and dirblock[0][1] == '.bzr':
1116
# this tests the filtering of selected paths
1119
result.append((dirdetail, dirblock))
1121
self.assertTrue(found_bzrdir)
1122
self.assertExpectedBlocks(expected_dirblocks, result)
1124
# you can search a subdir only, with a supplied prefix.
1126
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1127
result.append(dirblock)
1128
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1130
def _filter_out_stat(self, result):
1131
"""Filter out the stat value from the walkdirs result"""
1132
for dirdetail, dirblock in result:
1134
for info in dirblock:
1135
# Ignore info[3] which is the stat
1136
new_dirblock.append((info[0], info[1], info[2], info[4]))
1137
dirblock[:] = new_dirblock
1139
def _save_platform_info(self):
1140
self.overrideAttr(win32utils, 'winver')
1141
self.overrideAttr(osutils, '_fs_enc')
1142
self.overrideAttr(osutils, '_selected_dir_reader')
1144
def assertDirReaderIs(self, expected):
1145
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1146
# Force it to redetect
1147
osutils._selected_dir_reader = None
1148
# Nothing to list, but should still trigger the selection logic
1149
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1150
self.assertIsInstance(osutils._selected_dir_reader, expected)
1152
def test_force_walkdirs_utf8_fs_utf8(self):
1153
self.requireFeature(UTF8DirReaderFeature)
1154
self._save_platform_info()
1155
win32utils.winver = None # Avoid the win32 detection code
1156
osutils._fs_enc = 'UTF-8'
1157
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1159
def test_force_walkdirs_utf8_fs_ascii(self):
1160
self.requireFeature(UTF8DirReaderFeature)
1161
self._save_platform_info()
1162
win32utils.winver = None # Avoid the win32 detection code
1163
osutils._fs_enc = 'US-ASCII'
1164
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1166
def test_force_walkdirs_utf8_fs_ANSI(self):
1167
self.requireFeature(UTF8DirReaderFeature)
1168
self._save_platform_info()
1169
win32utils.winver = None # Avoid the win32 detection code
1170
osutils._fs_enc = 'ANSI_X3.4-1968'
1171
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1173
def test_force_walkdirs_utf8_fs_latin1(self):
1174
self._save_platform_info()
1175
win32utils.winver = None # Avoid the win32 detection code
1176
osutils._fs_enc = 'latin1'
1177
self.assertDirReaderIs(osutils.UnicodeDirReader)
1179
def test_force_walkdirs_utf8_nt(self):
1180
# Disabled because the thunk of the whole walkdirs api is disabled.
1181
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1182
self._save_platform_info()
1183
win32utils.winver = 'Windows NT'
1184
from bzrlib._walkdirs_win32 import Win32ReadDir
1185
self.assertDirReaderIs(Win32ReadDir)
1187
def test_force_walkdirs_utf8_98(self):
1188
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1189
self._save_platform_info()
1190
win32utils.winver = 'Windows 98'
1191
self.assertDirReaderIs(osutils.UnicodeDirReader)
1193
def test_unicode_walkdirs(self):
1194
"""Walkdirs should always return unicode paths."""
1195
self.requireFeature(tests.UnicodeFilenameFeature)
1196
name0 = u'0file-\xb6'
1197
name1 = u'1dir-\u062c\u0648'
1198
name2 = u'2file-\u0633'
1202
name1 + '/' + name0,
1203
name1 + '/' + name1 + '/',
1206
self.build_tree(tree)
1207
expected_dirblocks = [
1209
[(name0, name0, 'file', './' + name0),
1210
(name1, name1, 'directory', './' + name1),
1211
(name2, name2, 'file', './' + name2),
1214
((name1, './' + name1),
1215
[(name1 + '/' + name0, name0, 'file', './' + name1
1217
(name1 + '/' + name1, name1, 'directory', './' + name1
1221
((name1 + '/' + name1, './' + name1 + '/' + name1),
1226
result = list(osutils.walkdirs('.'))
1227
self._filter_out_stat(result)
1228
self.assertEqual(expected_dirblocks, result)
1229
result = list(osutils.walkdirs(u'./'+name1, name1))
1230
self._filter_out_stat(result)
1231
self.assertEqual(expected_dirblocks[1:], result)
1233
def test_unicode__walkdirs_utf8(self):
1234
"""Walkdirs_utf8 should always return utf8 paths.
1236
The abspath portion might be in unicode or utf-8
1238
self.requireFeature(tests.UnicodeFilenameFeature)
1239
name0 = u'0file-\xb6'
1240
name1 = u'1dir-\u062c\u0648'
1241
name2 = u'2file-\u0633'
1245
name1 + '/' + name0,
1246
name1 + '/' + name1 + '/',
1249
self.build_tree(tree)
1250
name0 = name0.encode('utf8')
1251
name1 = name1.encode('utf8')
1252
name2 = name2.encode('utf8')
1254
expected_dirblocks = [
1256
[(name0, name0, 'file', './' + name0),
1257
(name1, name1, 'directory', './' + name1),
1258
(name2, name2, 'file', './' + name2),
1261
((name1, './' + name1),
1262
[(name1 + '/' + name0, name0, 'file', './' + name1
1264
(name1 + '/' + name1, name1, 'directory', './' + name1
1268
((name1 + '/' + name1, './' + name1 + '/' + name1),
1274
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1275
# all abspaths are Unicode, and encode them back into utf8.
1276
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1277
self.assertIsInstance(dirdetail[0], str)
1278
if isinstance(dirdetail[1], unicode):
1279
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1280
dirblock = [list(info) for info in dirblock]
1281
for info in dirblock:
1282
self.assertIsInstance(info[4], unicode)
1283
info[4] = info[4].encode('utf8')
1285
for info in dirblock:
1286
self.assertIsInstance(info[0], str)
1287
self.assertIsInstance(info[1], str)
1288
self.assertIsInstance(info[4], str)
1289
# Remove the stat information
1290
new_dirblock.append((info[0], info[1], info[2], info[4]))
1291
result.append((dirdetail, new_dirblock))
1292
self.assertEqual(expected_dirblocks, result)
1294
def test__walkdirs_utf8_with_unicode_fs(self):
1295
"""UnicodeDirReader should be a safe fallback everywhere
1297
The abspath portion should be in unicode
1299
self.requireFeature(tests.UnicodeFilenameFeature)
1300
# Use the unicode reader. TODO: split into driver-and-driven unit
1302
self._save_platform_info()
1303
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1304
name0u = u'0file-\xb6'
1305
name1u = u'1dir-\u062c\u0648'
1306
name2u = u'2file-\u0633'
1310
name1u + '/' + name0u,
1311
name1u + '/' + name1u + '/',
1314
self.build_tree(tree)
1315
name0 = name0u.encode('utf8')
1316
name1 = name1u.encode('utf8')
1317
name2 = name2u.encode('utf8')
1319
# All of the abspaths should be in unicode, all of the relative paths
1321
expected_dirblocks = [
1323
[(name0, name0, 'file', './' + name0u),
1324
(name1, name1, 'directory', './' + name1u),
1325
(name2, name2, 'file', './' + name2u),
1328
((name1, './' + name1u),
1329
[(name1 + '/' + name0, name0, 'file', './' + name1u
1331
(name1 + '/' + name1, name1, 'directory', './' + name1u
1335
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1340
result = list(osutils._walkdirs_utf8('.'))
1341
self._filter_out_stat(result)
1342
self.assertEqual(expected_dirblocks, result)
1344
def test__walkdirs_utf8_win32readdir(self):
1345
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1346
self.requireFeature(tests.UnicodeFilenameFeature)
1347
from bzrlib._walkdirs_win32 import Win32ReadDir
1348
self._save_platform_info()
1349
osutils._selected_dir_reader = Win32ReadDir()
1350
name0u = u'0file-\xb6'
1351
name1u = u'1dir-\u062c\u0648'
1352
name2u = u'2file-\u0633'
1356
name1u + '/' + name0u,
1357
name1u + '/' + name1u + '/',
1360
self.build_tree(tree)
1361
name0 = name0u.encode('utf8')
1362
name1 = name1u.encode('utf8')
1363
name2 = name2u.encode('utf8')
1365
# All of the abspaths should be in unicode, all of the relative paths
1367
expected_dirblocks = [
1369
[(name0, name0, 'file', './' + name0u),
1370
(name1, name1, 'directory', './' + name1u),
1371
(name2, name2, 'file', './' + name2u),
1374
((name1, './' + name1u),
1375
[(name1 + '/' + name0, name0, 'file', './' + name1u
1377
(name1 + '/' + name1, name1, 'directory', './' + name1u
1381
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1386
result = list(osutils._walkdirs_utf8(u'.'))
1387
self._filter_out_stat(result)
1388
self.assertEqual(expected_dirblocks, result)
1390
def assertStatIsCorrect(self, path, win32stat):
1391
os_stat = os.stat(path)
1392
self.assertEqual(os_stat.st_size, win32stat.st_size)
1393
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1394
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1395
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1396
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1397
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1398
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1400
def test__walkdirs_utf_win32_find_file_stat_file(self):
1401
"""make sure our Stat values are valid"""
1402
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1403
self.requireFeature(tests.UnicodeFilenameFeature)
1404
from bzrlib._walkdirs_win32 import Win32ReadDir
1405
name0u = u'0file-\xb6'
1406
name0 = name0u.encode('utf8')
1407
self.build_tree([name0u])
1408
# I hate to sleep() here, but I'm trying to make the ctime different
1411
f = open(name0u, 'ab')
1413
f.write('just a small update')
1417
result = Win32ReadDir().read_dir('', u'.')
1419
self.assertEqual((name0, name0, 'file'), entry[:3])
1420
self.assertEqual(u'./' + name0u, entry[4])
1421
self.assertStatIsCorrect(entry[4], entry[3])
1422
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1424
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1425
"""make sure our Stat values are valid"""
1426
self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1427
self.requireFeature(tests.UnicodeFilenameFeature)
1428
from bzrlib._walkdirs_win32 import Win32ReadDir
1429
name0u = u'0dir-\u062c\u0648'
1430
name0 = name0u.encode('utf8')
1431
self.build_tree([name0u + '/'])
1433
result = Win32ReadDir().read_dir('', u'.')
1435
self.assertEqual((name0, name0, 'directory'), entry[:3])
1436
self.assertEqual(u'./' + name0u, entry[4])
1437
self.assertStatIsCorrect(entry[4], entry[3])
1439
def assertPathCompare(self, path_less, path_greater):
1440
"""check that path_less and path_greater compare correctly."""
1441
self.assertEqual(0, osutils.compare_paths_prefix_order(
1442
path_less, path_less))
1443
self.assertEqual(0, osutils.compare_paths_prefix_order(
1444
path_greater, path_greater))
1445
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1446
path_less, path_greater))
1447
self.assertEqual(1, osutils.compare_paths_prefix_order(
1448
path_greater, path_less))
1450
def test_compare_paths_prefix_order(self):
1451
# root before all else
1452
self.assertPathCompare("/", "/a")
1453
# alpha within a dir
1454
self.assertPathCompare("/a", "/b")
1455
self.assertPathCompare("/b", "/z")
1456
# high dirs before lower.
1457
self.assertPathCompare("/z", "/a/a")
1458
# except if the deeper dir should be output first
1459
self.assertPathCompare("/a/b/c", "/d/g")
1460
# lexical betwen dirs of the same height
1461
self.assertPathCompare("/a/z", "/z/z")
1462
self.assertPathCompare("/a/c/z", "/a/d/e")
1464
# this should also be consistent for no leading / paths
1465
# root before all else
1466
self.assertPathCompare("", "a")
1467
# alpha within a dir
1468
self.assertPathCompare("a", "b")
1469
self.assertPathCompare("b", "z")
1470
# high dirs before lower.
1471
self.assertPathCompare("z", "a/a")
1472
# except if the deeper dir should be output first
1473
self.assertPathCompare("a/b/c", "d/g")
1474
# lexical betwen dirs of the same height
1475
self.assertPathCompare("a/z", "z/z")
1476
self.assertPathCompare("a/c/z", "a/d/e")
1478
def test_path_prefix_sorting(self):
1479
"""Doing a sort on path prefix should match our sample data."""
1494
dir_sorted_paths = [
1510
sorted(original_paths, key=osutils.path_prefix_key))
1511
# using the comparison routine shoudl work too:
1514
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1517
class TestCopyTree(tests.TestCaseInTempDir):
1519
def test_copy_basic_tree(self):
1520
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1521
osutils.copy_tree('source', 'target')
1522
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1523
self.assertEqual(['c'], os.listdir('target/b'))
1525
def test_copy_tree_target_exists(self):
1526
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1528
osutils.copy_tree('source', 'target')
1529
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1530
self.assertEqual(['c'], os.listdir('target/b'))
1532
def test_copy_tree_symlinks(self):
1533
self.requireFeature(tests.SymlinkFeature)
1534
self.build_tree(['source/'])
1535
os.symlink('a/generic/path', 'source/lnk')
1536
osutils.copy_tree('source', 'target')
1537
self.assertEqual(['lnk'], os.listdir('target'))
1538
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1540
def test_copy_tree_handlers(self):
1541
processed_files = []
1542
processed_links = []
1543
def file_handler(from_path, to_path):
1544
processed_files.append(('f', from_path, to_path))
1545
def dir_handler(from_path, to_path):
1546
processed_files.append(('d', from_path, to_path))
1547
def link_handler(from_path, to_path):
1548
processed_links.append((from_path, to_path))
1549
handlers = {'file':file_handler,
1550
'directory':dir_handler,
1551
'symlink':link_handler,
1554
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1555
if osutils.has_symlinks():
1556
os.symlink('a/generic/path', 'source/lnk')
1557
osutils.copy_tree('source', 'target', handlers=handlers)
1559
self.assertEqual([('d', 'source', 'target'),
1560
('f', 'source/a', 'target/a'),
1561
('d', 'source/b', 'target/b'),
1562
('f', 'source/b/c', 'target/b/c'),
1564
self.failIfExists('target')
1565
if osutils.has_symlinks():
1566
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1569
class TestSetUnsetEnv(tests.TestCase):
1570
"""Test updating the environment"""
1573
super(TestSetUnsetEnv, self).setUp()
1575
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1576
'Environment was not cleaned up properly.'
1577
' Variable BZR_TEST_ENV_VAR should not exist.')
1579
if 'BZR_TEST_ENV_VAR' in os.environ:
1580
del os.environ['BZR_TEST_ENV_VAR']
1581
self.addCleanup(cleanup)
1584
"""Test that we can set an env variable"""
1585
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1586
self.assertEqual(None, old)
1587
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1589
def test_double_set(self):
1590
"""Test that we get the old value out"""
1591
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1592
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1593
self.assertEqual('foo', old)
1594
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1596
def test_unicode(self):
1597
"""Environment can only contain plain strings
1599
So Unicode strings must be encoded.
1601
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1603
raise tests.TestSkipped(
1604
'Cannot find a unicode character that works in encoding %s'
1605
% (osutils.get_user_encoding(),))
1607
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1608
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1610
def test_unset(self):
1611
"""Test that passing None will remove the env var"""
1612
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1613
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1614
self.assertEqual('foo', old)
1615
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1616
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1619
class TestSizeShaFile(tests.TestCaseInTempDir):
1621
def test_sha_empty(self):
1622
self.build_tree_contents([('foo', '')])
1623
expected_sha = osutils.sha_string('')
1625
self.addCleanup(f.close)
1626
size, sha = osutils.size_sha_file(f)
1627
self.assertEqual(0, size)
1628
self.assertEqual(expected_sha, sha)
1630
def test_sha_mixed_endings(self):
1631
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1632
self.build_tree_contents([('foo', text)])
1633
expected_sha = osutils.sha_string(text)
1634
f = open('foo', 'rb')
1635
self.addCleanup(f.close)
1636
size, sha = osutils.size_sha_file(f)
1637
self.assertEqual(38, size)
1638
self.assertEqual(expected_sha, sha)
1641
class TestShaFileByName(tests.TestCaseInTempDir):
1643
def test_sha_empty(self):
1644
self.build_tree_contents([('foo', '')])
1645
expected_sha = osutils.sha_string('')
1646
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1648
def test_sha_mixed_endings(self):
1649
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1650
self.build_tree_contents([('foo', text)])
1651
expected_sha = osutils.sha_string(text)
1652
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1655
class TestResourceLoading(tests.TestCaseInTempDir):
1657
def test_resource_string(self):
1658
# test resource in bzrlib
1659
text = osutils.resource_string('bzrlib', 'debug.py')
1660
self.assertContainsRe(text, "debug_flags = set()")
1661
# test resource under bzrlib
1662
text = osutils.resource_string('bzrlib.ui', 'text.py')
1663
self.assertContainsRe(text, "class TextUIFactory")
1664
# test unsupported package
1665
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1667
# test unknown resource
1668
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1671
class TestReCompile(tests.TestCase):
1673
def test_re_compile_checked(self):
1674
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1675
self.assertTrue(r.match('aaaa'))
1676
self.assertTrue(r.match('aAaA'))
1678
def test_re_compile_checked_error(self):
1679
# like https://bugs.launchpad.net/bzr/+bug/251352
1680
err = self.assertRaises(
1681
errors.BzrCommandError,
1682
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1684
"Invalid regular expression in test case: '*': "
1685
"nothing to repeat",
1689
class TestDirReader(tests.TestCaseInTempDir):
1692
_dir_reader_class = None
1693
_native_to_unicode = None
1696
tests.TestCaseInTempDir.setUp(self)
1697
self.overrideAttr(osutils,
1698
'_selected_dir_reader', self._dir_reader_class())
1700
def _get_ascii_tree(self):
1708
expected_dirblocks = [
1710
[('0file', '0file', 'file'),
1711
('1dir', '1dir', 'directory'),
1712
('2file', '2file', 'file'),
1715
(('1dir', './1dir'),
1716
[('1dir/0file', '0file', 'file'),
1717
('1dir/1dir', '1dir', 'directory'),
1720
(('1dir/1dir', './1dir/1dir'),
1725
return tree, expected_dirblocks
1727
def test_walk_cur_dir(self):
1728
tree, expected_dirblocks = self._get_ascii_tree()
1729
self.build_tree(tree)
1730
result = list(osutils._walkdirs_utf8('.'))
1731
# Filter out stat and abspath
1732
self.assertEqual(expected_dirblocks,
1733
[(dirinfo, [line[0:3] for line in block])
1734
for dirinfo, block in result])
1736
def test_walk_sub_dir(self):
1737
tree, expected_dirblocks = self._get_ascii_tree()
1738
self.build_tree(tree)
1739
# you can search a subdir only, with a supplied prefix.
1740
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1741
# Filter out stat and abspath
1742
self.assertEqual(expected_dirblocks[1:],
1743
[(dirinfo, [line[0:3] for line in block])
1744
for dirinfo, block in result])
1746
def _get_unicode_tree(self):
1747
name0u = u'0file-\xb6'
1748
name1u = u'1dir-\u062c\u0648'
1749
name2u = u'2file-\u0633'
1753
name1u + '/' + name0u,
1754
name1u + '/' + name1u + '/',
1757
name0 = name0u.encode('UTF-8')
1758
name1 = name1u.encode('UTF-8')
1759
name2 = name2u.encode('UTF-8')
1760
expected_dirblocks = [
1762
[(name0, name0, 'file', './' + name0u),
1763
(name1, name1, 'directory', './' + name1u),
1764
(name2, name2, 'file', './' + name2u),
1767
((name1, './' + name1u),
1768
[(name1 + '/' + name0, name0, 'file', './' + name1u
1770
(name1 + '/' + name1, name1, 'directory', './' + name1u
1774
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1779
return tree, expected_dirblocks
1781
def _filter_out(self, raw_dirblocks):
1782
"""Filter out a walkdirs_utf8 result.
1784
stat field is removed, all native paths are converted to unicode
1786
filtered_dirblocks = []
1787
for dirinfo, block in raw_dirblocks:
1788
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1791
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1792
filtered_dirblocks.append((dirinfo, details))
1793
return filtered_dirblocks
1795
def test_walk_unicode_tree(self):
1796
self.requireFeature(tests.UnicodeFilenameFeature)
1797
tree, expected_dirblocks = self._get_unicode_tree()
1798
self.build_tree(tree)
1799
result = list(osutils._walkdirs_utf8('.'))
1800
self.assertEqual(expected_dirblocks, self._filter_out(result))
1802
def test_symlink(self):
1803
self.requireFeature(tests.SymlinkFeature)
1804
self.requireFeature(tests.UnicodeFilenameFeature)
1805
target = u'target\N{Euro Sign}'
1806
link_name = u'l\N{Euro Sign}nk'
1807
os.symlink(target, link_name)
1808
target_utf8 = target.encode('UTF-8')
1809
link_name_utf8 = link_name.encode('UTF-8')
1810
expected_dirblocks = [
1812
[(link_name_utf8, link_name_utf8,
1813
'symlink', './' + link_name),],
1815
result = list(osutils._walkdirs_utf8('.'))
1816
self.assertEqual(expected_dirblocks, self._filter_out(result))
1819
class TestReadLink(tests.TestCaseInTempDir):
1820
"""Exposes os.readlink() problems and the osutils solution.
1822
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1823
unicode string will be returned if a unicode string is passed.
1825
But prior python versions failed to properly encode the passed unicode
1828
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1831
super(tests.TestCaseInTempDir, self).setUp()
1832
self.link = u'l\N{Euro Sign}ink'
1833
self.target = u'targe\N{Euro Sign}t'
1834
os.symlink(self.target, self.link)
1836
def test_os_readlink_link_encoding(self):
1837
if sys.version_info < (2, 6):
1838
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1840
self.assertEquals(self.target, os.readlink(self.link))
1842
def test_os_readlink_link_decoding(self):
1843
self.assertEquals(self.target.encode(osutils._fs_enc),
1844
os.readlink(self.link.encode(osutils._fs_enc)))
1847
class TestConcurrency(tests.TestCase):
1850
super(TestConcurrency, self).setUp()
1851
self.overrideAttr(osutils, '_cached_local_concurrency')
1853
def test_local_concurrency(self):
1854
concurrency = osutils.local_concurrency()
1855
self.assertIsInstance(concurrency, int)
1857
def test_local_concurrency_environment_variable(self):
1858
os.environ['BZR_CONCURRENCY'] = '2'
1859
self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1860
os.environ['BZR_CONCURRENCY'] = '3'
1861
self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1862
os.environ['BZR_CONCURRENCY'] = 'foo'
1863
self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1865
def test_option_concurrency(self):
1866
os.environ['BZR_CONCURRENCY'] = '1'
1867
self.run_bzr('rocks --concurrency 42')
1868
# Command line overrides envrionment variable
1869
self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1870
self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1873
class TestFailedToLoadExtension(tests.TestCase):
1875
def _try_loading(self):
1877
import bzrlib._fictional_extension_py
1878
except ImportError, e:
1879
osutils.failed_to_load_extension(e)
1883
super(TestFailedToLoadExtension, self).setUp()
1884
self.overrideAttr(osutils, '_extension_load_failures', [])
1886
def test_failure_to_load(self):
1888
self.assertLength(1, osutils._extension_load_failures)
1889
self.assertEquals(osutils._extension_load_failures[0],
1890
"No module named _fictional_extension_py")
1892
def test_report_extension_load_failures_no_warning(self):
1893
self.assertTrue(self._try_loading())
1894
warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1895
# it used to give a Python warning; it no longer does
1896
self.assertLength(0, warnings)
1898
def test_report_extension_load_failures_message(self):
1900
trace.push_log_file(log)
1901
self.assertTrue(self._try_loading())
1902
osutils.report_extension_load_failures()
1903
self.assertContainsRe(
1905
r"bzr: warning: some compiled extensions could not be loaded; "
1906
"see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1910
class TestTerminalWidth(tests.TestCase):
1912
def replace_stdout(self, new):
1913
self.overrideAttr(sys, 'stdout', new)
1915
def replace__terminal_size(self, new):
1916
self.overrideAttr(osutils, '_terminal_size', new)
1918
def set_fake_tty(self):
1920
class I_am_a_tty(object):
1924
self.replace_stdout(I_am_a_tty())
1926
def test_default_values(self):
1927
self.assertEqual(80, osutils.default_terminal_width)
1929
def test_defaults_to_BZR_COLUMNS(self):
1930
# BZR_COLUMNS is set by the test framework
1931
self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1932
os.environ['BZR_COLUMNS'] = '12'
1933
self.assertEqual(12, osutils.terminal_width())
1935
def test_falls_back_to_COLUMNS(self):
1936
del os.environ['BZR_COLUMNS']
1937
self.assertNotEqual('42', os.environ['COLUMNS'])
1939
os.environ['COLUMNS'] = '42'
1940
self.assertEqual(42, osutils.terminal_width())
1942
def test_tty_default_without_columns(self):
1943
del os.environ['BZR_COLUMNS']
1944
del os.environ['COLUMNS']
1946
def terminal_size(w, h):
1950
# We need to override the osutils definition as it depends on the
1951
# running environment that we can't control (PQM running without a
1952
# controlling terminal is one example).
1953
self.replace__terminal_size(terminal_size)
1954
self.assertEqual(42, osutils.terminal_width())
1956
def test_non_tty_default_without_columns(self):
1957
del os.environ['BZR_COLUMNS']
1958
del os.environ['COLUMNS']
1959
self.replace_stdout(None)
1960
self.assertEqual(None, osutils.terminal_width())
1962
def test_no_TIOCGWINSZ(self):
1963
self.requireFeature(term_ios_feature)
1964
termios = term_ios_feature.module
1965
# bug 63539 is about a termios without TIOCGWINSZ attribute
1967
orig = termios.TIOCGWINSZ
1968
except AttributeError:
1969
# We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
1972
self.overrideAttr(termios, 'TIOCGWINSZ')
1973
del termios.TIOCGWINSZ
1974
del os.environ['BZR_COLUMNS']
1975
del os.environ['COLUMNS']
1976
# Whatever the result is, if we don't raise an exception, it's ok.
1977
osutils.terminal_width()