17
17
"""Tests for the osutils wrapper."""
19
from cStringIO import StringIO
26
28
from bzrlib import (
31
34
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
35
from bzrlib.osutils import (
37
is_inside_or_parent_of_any,
32
42
from bzrlib.tests import (
45
probe_unicode_in_user_encoding,
54
from bzrlib.tests.file_utils import (
57
from bzrlib.tests.test__walkdirs_win32 import WalkdirsWin32Feature
60
def load_tests(standard_tests, module, loader):
61
"""Parameterize readdir tests."""
62
to_adapt, result = split_suite_by_re(standard_tests, "readdir")
63
adapter = TestScenarioApplier()
64
from bzrlib import _readdir_py
65
adapter.scenarios = [('python', {'read_dir': _readdir_py.read_dir})]
66
if ReadDirFeature.available():
67
adapter.scenarios.append(('pyrex',
68
{'read_dir': ReadDirFeature.read_dir}))
69
adapt_tests(to_adapt, adapter, result)
73
class _ReadDirFeature(Feature):
77
from bzrlib import _readdir_pyx
78
self.read_dir = _readdir_pyx.read_dir
83
def feature_name(self):
84
return 'bzrlib._readdir_pyx'
86
ReadDirFeature = _ReadDirFeature()
40
89
class TestOSUtils(TestCaseInTempDir):
105
164
self.assertFalse(is_inside('foo.c', ''))
106
165
self.assertTrue(is_inside('', 'foo.c'))
167
def test_is_inside_any(self):
168
SRC_FOO_C = pathjoin('src', 'foo.c')
169
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
170
(['src'], SRC_FOO_C),
173
self.assert_(is_inside_any(dirs, fn))
174
for dirs, fn in [(['src'], 'srccontrol'),
175
(['src'], 'srccontrol/foo')]:
176
self.assertFalse(is_inside_any(dirs, fn))
178
def test_is_inside_or_parent_of_any(self):
179
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
180
(['src'], 'src/foo.c'),
181
(['src/bar.c'], 'src'),
182
(['src/bar.c', 'bla/foo.c'], 'src'),
185
self.assert_(is_inside_or_parent_of_any(dirs, fn))
187
for dirs, fn in [(['src'], 'srccontrol'),
188
(['srccontrol/foo.c'], 'src'),
189
(['src'], 'srccontrol/foo')]:
190
self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
108
192
def test_rmtree(self):
109
193
# Check to remove tree with read-only files/dirs
268
354
osutils.make_readonly('dangling')
269
355
osutils.make_writable('dangling')
272
357
def test_kind_marker(self):
273
358
self.assertEqual("", osutils.kind_marker("file"))
274
359
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
275
360
self.assertEqual("@", osutils.kind_marker("symlink"))
276
361
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
363
def test_host_os_dereferences_symlinks(self):
364
osutils.host_os_dereferences_symlinks()
367
class TestPumpFile(TestCase):
368
"""Test pumpfile method."""
370
# create a test datablock
371
self.block_size = 512
372
pattern = '0123456789ABCDEF'
373
self.test_data = pattern * (3 * self.block_size / len(pattern))
374
self.test_data_len = len(self.test_data)
376
def test_bracket_block_size(self):
377
"""Read data in blocks with the requested read size bracketing the
379
# make sure test data is larger than max read size
380
self.assertTrue(self.test_data_len > self.block_size)
382
from_file = FakeReadFile(self.test_data)
385
# read (max / 2) bytes and verify read size wasn't affected
386
num_bytes_to_read = self.block_size / 2
387
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
388
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
389
self.assertEqual(from_file.get_read_count(), 1)
391
# read (max) bytes and verify read size wasn't affected
392
num_bytes_to_read = self.block_size
393
from_file.reset_read_count()
394
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
395
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
396
self.assertEqual(from_file.get_read_count(), 1)
398
# read (max + 1) bytes and verify read size was limited
399
num_bytes_to_read = self.block_size + 1
400
from_file.reset_read_count()
401
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
402
self.assertEqual(from_file.get_max_read_size(), self.block_size)
403
self.assertEqual(from_file.get_read_count(), 2)
405
# finish reading the rest of the data
406
num_bytes_to_read = self.test_data_len - to_file.tell()
407
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
409
# report error if the data wasn't equal (we only report the size due
410
# to the length of the data)
411
response_data = to_file.getvalue()
412
if response_data != self.test_data:
413
message = "Data not equal. Expected %d bytes, received %d."
414
self.fail(message % (len(response_data), self.test_data_len))
416
def test_specified_size(self):
417
"""Request a transfer larger than the maximum block size and verify
418
that the maximum read doesn't exceed the block_size."""
419
# make sure test data is larger than max read size
420
self.assertTrue(self.test_data_len > self.block_size)
422
# retrieve data in blocks
423
from_file = FakeReadFile(self.test_data)
425
pumpfile(from_file, to_file, self.test_data_len, self.block_size)
427
# verify read size was equal to the maximum read size
428
self.assertTrue(from_file.get_max_read_size() > 0)
429
self.assertEqual(from_file.get_max_read_size(), self.block_size)
430
self.assertEqual(from_file.get_read_count(), 3)
432
# report error if the data wasn't equal (we only report the size due
433
# to the length of the data)
434
response_data = to_file.getvalue()
435
if response_data != self.test_data:
436
message = "Data not equal. Expected %d bytes, received %d."
437
self.fail(message % (len(response_data), self.test_data_len))
439
def test_to_eof(self):
440
"""Read to end-of-file and verify that the reads are not larger than
441
the maximum read size."""
442
# make sure test data is larger than max read size
443
self.assertTrue(self.test_data_len > self.block_size)
445
# retrieve data to EOF
446
from_file = FakeReadFile(self.test_data)
448
pumpfile(from_file, to_file, -1, self.block_size)
450
# verify read size was equal to the maximum read size
451
self.assertEqual(from_file.get_max_read_size(), self.block_size)
452
self.assertEqual(from_file.get_read_count(), 4)
454
# report error if the data wasn't equal (we only report the size due
455
# to the length of the data)
456
response_data = to_file.getvalue()
457
if response_data != self.test_data:
458
message = "Data not equal. Expected %d bytes, received %d."
459
self.fail(message % (len(response_data), self.test_data_len))
461
def test_defaults(self):
462
"""Verifies that the default arguments will read to EOF -- this
463
test verifies that any existing usages of pumpfile will not be broken
464
with this new version."""
465
# retrieve data using default (old) pumpfile method
466
from_file = FakeReadFile(self.test_data)
468
pumpfile(from_file, to_file)
470
# report error if the data wasn't equal (we only report the size due
471
# to the length of the data)
472
response_data = to_file.getvalue()
473
if response_data != self.test_data:
474
message = "Data not equal. Expected %d bytes, received %d."
475
self.fail(message % (len(response_data), self.test_data_len))
478
class TestPumpStringFile(TestCase):
480
def test_empty(self):
482
pump_string_file("", output)
483
self.assertEqual("", output.getvalue())
485
def test_more_than_segment_size(self):
487
pump_string_file("123456789", output, 2)
488
self.assertEqual("123456789", output.getvalue())
490
def test_segment_size(self):
492
pump_string_file("12", output, 2)
493
self.assertEqual("12", output.getvalue())
495
def test_segment_size_multiple(self):
497
pump_string_file("1234", output, 2)
498
self.assertEqual("1234", output.getvalue())
279
501
class TestSafeUnicode(TestCase):
647
912
new_dirblock.append((info[0], info[1], info[2], info[4]))
648
913
dirblock[:] = new_dirblock
915
def test__walkdirs_utf8_selection(self):
916
# Just trigger the function once, to make sure it has selected a real
918
list(osutils._walkdirs_utf8('.'))
919
if WalkdirsWin32Feature.available():
920
# If the compiled form is available, make sure it is used
921
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
922
self.assertIs(_walkdirs_utf8_win32_find_file,
923
osutils._real_walkdirs_utf8)
924
elif sys.platform == 'win32':
925
self.assertIs(osutils._walkdirs_unicode_to_utf8,
926
osutils._real_walkdirs_utf8)
927
elif osutils._fs_enc.upper() in ('UTF-8', 'ASCII', 'ANSI_X3.4-1968'): # ascii
928
self.assertIs(osutils._walkdirs_fs_utf8,
929
osutils._real_walkdirs_utf8)
931
self.assertIs(osutils._walkdirs_unicode_to_utf8,
932
osutils._real_walkdirs_utf8)
934
def _save_platform_info(self):
935
cur_winver = win32utils.winver
936
cur_fs_enc = osutils._fs_enc
937
cur_real_walkdirs_utf8 = osutils._real_walkdirs_utf8
939
win32utils.winver = cur_winver
940
osutils._fs_enc = cur_fs_enc
941
osutils._real_walkdirs_utf8 = cur_real_walkdirs_utf8
942
self.addCleanup(restore)
944
def assertWalkdirsUtf8Is(self, expected):
945
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
946
# Force it to redetect
947
osutils._real_walkdirs_utf8 = None
948
# Nothing to list, but should still trigger the selection logic
949
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
950
self.assertIs(expected, osutils._real_walkdirs_utf8)
952
def test_force_walkdirs_utf8_fs_utf8(self):
953
self._save_platform_info()
954
win32utils.winver = None # Avoid the win32 detection code
955
osutils._fs_enc = 'UTF-8'
956
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
958
def test_force_walkdirs_utf8_fs_ascii(self):
959
self._save_platform_info()
960
win32utils.winver = None # Avoid the win32 detection code
961
osutils._fs_enc = 'US-ASCII'
962
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
964
def test_force_walkdirs_utf8_fs_ANSI(self):
965
self._save_platform_info()
966
win32utils.winver = None # Avoid the win32 detection code
967
osutils._fs_enc = 'ANSI_X3.4-1968'
968
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
970
def test_force_walkdirs_utf8_fs_latin1(self):
971
self._save_platform_info()
972
win32utils.winver = None # Avoid the win32 detection code
973
osutils._fs_enc = 'latin1'
974
self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
976
def test_force_walkdirs_utf8_nt(self):
977
self.requireFeature(WalkdirsWin32Feature)
978
self._save_platform_info()
979
win32utils.winver = 'Windows NT'
980
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
981
self.assertWalkdirsUtf8Is(_walkdirs_utf8_win32_find_file)
983
def test_force_walkdirs_utf8_nt(self):
984
self.requireFeature(WalkdirsWin32Feature)
985
self._save_platform_info()
986
win32utils.winver = 'Windows 98'
987
self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
650
989
def test_unicode_walkdirs(self):
651
990
"""Walkdirs should always return unicode paths."""
652
991
name0 = u'0file-\xb6'
803
1142
self._filter_out_stat(result)
804
1143
self.assertEqual(expected_dirblocks, result)
1145
def test__walkdirs_utf_win32_find_file(self):
1146
self.requireFeature(WalkdirsWin32Feature)
1147
self.requireFeature(tests.UnicodeFilenameFeature)
1148
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1149
name0u = u'0file-\xb6'
1150
name1u = u'1dir-\u062c\u0648'
1151
name2u = u'2file-\u0633'
1155
name1u + '/' + name0u,
1156
name1u + '/' + name1u + '/',
1159
self.build_tree(tree)
1160
name0 = name0u.encode('utf8')
1161
name1 = name1u.encode('utf8')
1162
name2 = name2u.encode('utf8')
1164
# All of the abspaths should be in unicode, all of the relative paths
1166
expected_dirblocks = [
1168
[(name0, name0, 'file', './' + name0u),
1169
(name1, name1, 'directory', './' + name1u),
1170
(name2, name2, 'file', './' + name2u),
1173
((name1, './' + name1u),
1174
[(name1 + '/' + name0, name0, 'file', './' + name1u
1176
(name1 + '/' + name1, name1, 'directory', './' + name1u
1180
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1185
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1186
self._filter_out_stat(result)
1187
self.assertEqual(expected_dirblocks, result)
1189
def assertStatIsCorrect(self, path, win32stat):
1190
os_stat = os.stat(path)
1191
self.assertEqual(os_stat.st_size, win32stat.st_size)
1192
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1193
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1194
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1195
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1196
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1197
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1199
def test__walkdirs_utf_win32_find_file_stat_file(self):
1200
"""make sure our Stat values are valid"""
1201
self.requireFeature(WalkdirsWin32Feature)
1202
self.requireFeature(tests.UnicodeFilenameFeature)
1203
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1204
name0u = u'0file-\xb6'
1205
name0 = name0u.encode('utf8')
1206
self.build_tree([name0u])
1207
# I hate to sleep() here, but I'm trying to make the ctime different
1210
f = open(name0u, 'ab')
1212
f.write('just a small update')
1216
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1217
entry = result[0][1][0]
1218
self.assertEqual((name0, name0, 'file'), entry[:3])
1219
self.assertEqual(u'./' + name0u, entry[4])
1220
self.assertStatIsCorrect(entry[4], entry[3])
1221
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1223
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1224
"""make sure our Stat values are valid"""
1225
self.requireFeature(WalkdirsWin32Feature)
1226
self.requireFeature(tests.UnicodeFilenameFeature)
1227
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1228
name0u = u'0dir-\u062c\u0648'
1229
name0 = name0u.encode('utf8')
1230
self.build_tree([name0u + '/'])
1232
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1233
entry = result[0][1][0]
1234
self.assertEqual((name0, name0, 'directory'), entry[:3])
1235
self.assertEqual(u'./' + name0u, entry[4])
1236
self.assertStatIsCorrect(entry[4], entry[3])
806
1238
def assertPathCompare(self, path_less, path_greater):
807
1239
"""check that path_less and path_greater compare correctly."""
808
1240
self.assertEqual(0, osutils.compare_paths_prefix_order(
1018
1439
self.assertTrue(isinstance(offset, int))
1019
1440
eighteen_hours = 18 * 3600
1020
1441
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1444
class TestShaFileByName(TestCaseInTempDir):
1446
def test_sha_empty(self):
1447
self.build_tree_contents([('foo', '')])
1448
expected_sha = osutils.sha_string('')
1449
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1451
def test_sha_mixed_endings(self):
1452
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1453
self.build_tree_contents([('foo', text)])
1454
expected_sha = osutils.sha_string(text)
1455
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1459
r'''# Copyright (C) 2005, 2006 Canonical Ltd
1461
# This program is free software; you can redistribute it and/or modify
1462
# it under the terms of the GNU General Public License as published by
1463
# the Free Software Foundation; either version 2 of the License, or
1464
# (at your option) any later version.
1466
# This program is distributed in the hope that it will be useful,
1467
# but WITHOUT ANY WARRANTY; without even the implied warranty of
1468
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1469
# GNU General Public License for more details.
1471
# You should have received a copy of the GNU General Public License
1472
# along with this program; if not, write to the Free Software
1473
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1476
# NOTE: If update these, please also update the help for global-options in
1477
# bzrlib/help_topics/__init__.py
1480
"""Set of flags that enable different debug behaviour.
1482
These are set with eg ``-Dlock`` on the bzr command line.
1486
* auth - show authentication sections used
1487
* error - show stack traces for all top level exceptions
1488
* evil - capture call sites that do expensive or badly-scaling operations.
1489
* fetch - trace history copying between repositories
1490
* graph - trace graph traversal information
1491
* hashcache - log every time a working file is read to determine its hash
1492
* hooks - trace hook execution
1493
* hpss - trace smart protocol requests and responses
1494
* http - trace http connections, requests and responses
1495
* index - trace major index operations
1496
* knit - trace knit operations
1497
* lock - trace when lockdir locks are taken or released
1498
* merge - emit information for debugging merges
1499
* pack - emit information about pack operations
1505
class TestResourceLoading(TestCaseInTempDir):
1507
def test_resource_string(self):
1508
# test resource in bzrlib
1509
text = osutils.resource_string('bzrlib', 'debug.py')
1510
self.assertEquals(_debug_text, text)
1511
# test resource under bzrlib
1512
text = osutils.resource_string('bzrlib.ui', 'text.py')
1513
self.assertContainsRe(text, "class TextUIFactory")
1514
# test unsupported package
1515
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1517
# test unknown resource
1518
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')