127
230
os.remove('socket')
232
def test_kind_marker(self):
233
self.assertEqual(osutils.kind_marker('file'), '')
234
self.assertEqual(osutils.kind_marker('directory'), '/')
235
self.assertEqual(osutils.kind_marker('symlink'), '@')
236
self.assertEqual(osutils.kind_marker('tree-reference'), '+')
238
def test_get_umask(self):
239
if sys.platform == 'win32':
240
# umask always returns '0', no way to set it
241
self.assertEqual(0, osutils.get_umask())
244
orig_umask = osutils.get_umask()
247
self.assertEqual(0222, osutils.get_umask())
249
self.assertEqual(0022, osutils.get_umask())
251
self.assertEqual(0002, osutils.get_umask())
253
self.assertEqual(0027, osutils.get_umask())
257
def assertFormatedDelta(self, expected, seconds):
258
"""Assert osutils.format_delta formats as expected"""
259
actual = osutils.format_delta(seconds)
260
self.assertEqual(expected, actual)
262
def test_format_delta(self):
263
self.assertFormatedDelta('0 seconds ago', 0)
264
self.assertFormatedDelta('1 second ago', 1)
265
self.assertFormatedDelta('10 seconds ago', 10)
266
self.assertFormatedDelta('59 seconds ago', 59)
267
self.assertFormatedDelta('89 seconds ago', 89)
268
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
269
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
270
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
271
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
272
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
273
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
274
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
275
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
276
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
277
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
278
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
279
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
280
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
281
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
282
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
283
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
284
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
285
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
287
# We handle when time steps the wrong direction because computers
288
# don't have synchronized clocks.
289
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
290
self.assertFormatedDelta('1 second in the future', -1)
291
self.assertFormatedDelta('2 seconds in the future', -2)
293
def test_format_date(self):
294
self.assertRaises(errors.UnsupportedTimezoneFormat,
295
osutils.format_date, 0, timezone='foo')
296
self.assertIsInstance(osutils.format_date(0), str)
297
self.assertIsInstance(osutils.format_local_date(0), unicode)
298
# Testing for the actual value of the local weekday without
299
# duplicating the code from format_date is difficult.
300
# Instead blackbox.test_locale should check for localized
301
# dates once they do occur in output strings.
303
def test_dereference_path(self):
304
self.requireFeature(SymlinkFeature)
305
cwd = osutils.realpath('.')
307
bar_path = osutils.pathjoin(cwd, 'bar')
308
# Using './' to avoid bug #1213894 (first path component not
309
# dereferenced) in Python 2.4.1 and earlier
310
self.assertEqual(bar_path, osutils.realpath('./bar'))
311
os.symlink('bar', 'foo')
312
self.assertEqual(bar_path, osutils.realpath('./foo'))
314
# Does not dereference terminal symlinks
315
foo_path = osutils.pathjoin(cwd, 'foo')
316
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
318
# Dereferences parent symlinks
320
baz_path = osutils.pathjoin(bar_path, 'baz')
321
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
323
# Dereferences parent symlinks that are the first path element
324
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
326
# Dereferences parent symlinks in absolute paths
327
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
328
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
330
def test_changing_access(self):
331
f = file('file', 'w')
335
# Make a file readonly
336
osutils.make_readonly('file')
337
mode = os.lstat('file').st_mode
338
self.assertEqual(mode, mode & 0777555)
340
# Make a file writable
341
osutils.make_writable('file')
342
mode = os.lstat('file').st_mode
343
self.assertEqual(mode, mode | 0200)
345
if osutils.has_symlinks():
346
# should not error when handed a symlink
347
os.symlink('nonexistent', 'dangling')
348
osutils.make_readonly('dangling')
349
osutils.make_writable('dangling')
351
def test_kind_marker(self):
352
self.assertEqual("", osutils.kind_marker("file"))
353
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
354
self.assertEqual("@", osutils.kind_marker("symlink"))
355
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
357
def test_host_os_dereferences_symlinks(self):
358
osutils.host_os_dereferences_symlinks()
361
class TestCanonicalRelPath(TestCaseInTempDir):
363
_test_needs_features = [CaseInsCasePresFilenameFeature]
365
def test_canonical_relpath_simple(self):
366
f = file('MixedCaseName', 'w')
368
self.failUnlessEqual(
369
canonical_relpath(self.test_base_dir, 'mixedcasename'),
370
'work/MixedCaseName')
372
def test_canonical_relpath_missing_tail(self):
373
os.mkdir('MixedCaseParent')
374
self.failUnlessEqual(
375
canonical_relpath(self.test_base_dir, 'mixedcaseparent/nochild'),
376
'work/MixedCaseParent/nochild')
379
class TestPumpFile(TestCase):
380
"""Test pumpfile method."""
382
# create a test datablock
383
self.block_size = 512
384
pattern = '0123456789ABCDEF'
385
self.test_data = pattern * (3 * self.block_size / len(pattern))
386
self.test_data_len = len(self.test_data)
388
def test_bracket_block_size(self):
389
"""Read data in blocks with the requested read size bracketing the
391
# make sure test data is larger than max read size
392
self.assertTrue(self.test_data_len > self.block_size)
394
from_file = FakeReadFile(self.test_data)
397
# read (max / 2) bytes and verify read size wasn't affected
398
num_bytes_to_read = self.block_size / 2
399
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
400
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
401
self.assertEqual(from_file.get_read_count(), 1)
403
# read (max) bytes and verify read size wasn't affected
404
num_bytes_to_read = self.block_size
405
from_file.reset_read_count()
406
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
407
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
408
self.assertEqual(from_file.get_read_count(), 1)
410
# read (max + 1) bytes and verify read size was limited
411
num_bytes_to_read = self.block_size + 1
412
from_file.reset_read_count()
413
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
414
self.assertEqual(from_file.get_max_read_size(), self.block_size)
415
self.assertEqual(from_file.get_read_count(), 2)
417
# finish reading the rest of the data
418
num_bytes_to_read = self.test_data_len - to_file.tell()
419
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
421
# report error if the data wasn't equal (we only report the size due
422
# to the length of the data)
423
response_data = to_file.getvalue()
424
if response_data != self.test_data:
425
message = "Data not equal. Expected %d bytes, received %d."
426
self.fail(message % (len(response_data), self.test_data_len))
428
def test_specified_size(self):
429
"""Request a transfer larger than the maximum block size and verify
430
that the maximum read doesn't exceed the block_size."""
431
# make sure test data is larger than max read size
432
self.assertTrue(self.test_data_len > self.block_size)
434
# retrieve data in blocks
435
from_file = FakeReadFile(self.test_data)
437
pumpfile(from_file, to_file, self.test_data_len, self.block_size)
439
# verify read size was equal to the maximum read size
440
self.assertTrue(from_file.get_max_read_size() > 0)
441
self.assertEqual(from_file.get_max_read_size(), self.block_size)
442
self.assertEqual(from_file.get_read_count(), 3)
444
# report error if the data wasn't equal (we only report the size due
445
# to the length of the data)
446
response_data = to_file.getvalue()
447
if response_data != self.test_data:
448
message = "Data not equal. Expected %d bytes, received %d."
449
self.fail(message % (len(response_data), self.test_data_len))
451
def test_to_eof(self):
452
"""Read to end-of-file and verify that the reads are not larger than
453
the maximum read size."""
454
# make sure test data is larger than max read size
455
self.assertTrue(self.test_data_len > self.block_size)
457
# retrieve data to EOF
458
from_file = FakeReadFile(self.test_data)
460
pumpfile(from_file, to_file, -1, self.block_size)
462
# verify read size was equal to the maximum read size
463
self.assertEqual(from_file.get_max_read_size(), self.block_size)
464
self.assertEqual(from_file.get_read_count(), 4)
466
# report error if the data wasn't equal (we only report the size due
467
# to the length of the data)
468
response_data = to_file.getvalue()
469
if response_data != self.test_data:
470
message = "Data not equal. Expected %d bytes, received %d."
471
self.fail(message % (len(response_data), self.test_data_len))
473
def test_defaults(self):
474
"""Verifies that the default arguments will read to EOF -- this
475
test verifies that any existing usages of pumpfile will not be broken
476
with this new version."""
477
# retrieve data using default (old) pumpfile method
478
from_file = FakeReadFile(self.test_data)
480
pumpfile(from_file, to_file)
482
# report error if the data wasn't equal (we only report the size due
483
# to the length of the data)
484
response_data = to_file.getvalue()
485
if response_data != self.test_data:
486
message = "Data not equal. Expected %d bytes, received %d."
487
self.fail(message % (len(response_data), self.test_data_len))
490
class TestPumpStringFile(TestCase):
492
def test_empty(self):
494
pump_string_file("", output)
495
self.assertEqual("", output.getvalue())
497
def test_more_than_segment_size(self):
499
pump_string_file("123456789", output, 2)
500
self.assertEqual("123456789", output.getvalue())
502
def test_segment_size(self):
504
pump_string_file("12", output, 2)
505
self.assertEqual("12", output.getvalue())
507
def test_segment_size_multiple(self):
509
pump_string_file("1234", output, 2)
510
self.assertEqual("1234", output.getvalue())
130
513
class TestSafeUnicode(TestCase):
236
820
self.build_tree(tree)
237
821
expected_dirblocks = [
239
('0file', '0file', 'file'),
240
('1dir', '1dir', 'directory'),
241
('2file', '2file', 'file'),
244
('1dir/0file', '0file', 'file'),
245
('1dir/1dir', '1dir', 'directory'),
252
for dirblock in osutils.walkdirs('.'):
253
if len(dirblock) and dirblock[0][1] == '.bzr':
254
# this tests the filtering of selected paths
257
result.append(dirblock)
259
self.assertTrue(found_bzrdir)
260
self.assertEqual(expected_dirblocks,
261
[[line[0:3] for line in block] for block in result])
262
# you can search a subdir only, with a supplied prefix.
264
for dirblock in osutils.walkdirs('1dir', '1dir'):
265
result.append(dirblock)
266
self.assertEqual(expected_dirblocks[1:],
267
[[line[0:3] for line in block] for block in result])
823
[('0file', '0file', 'file'),
824
('1dir', '1dir', 'directory'),
825
('2file', '2file', 'file'),
829
[('1dir/0file', '0file', 'file'),
830
('1dir/1dir', '1dir', 'directory'),
833
(('1dir/1dir', './1dir/1dir'),
840
for dirdetail, dirblock in osutils.walkdirs('.'):
841
if len(dirblock) and dirblock[0][1] == '.bzr':
842
# this tests the filtering of selected paths
845
result.append((dirdetail, dirblock))
847
self.assertTrue(found_bzrdir)
848
self.assertEqual(expected_dirblocks,
849
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
850
# you can search a subdir only, with a supplied prefix.
852
for dirblock in osutils.walkdirs('./1dir', '1dir'):
853
result.append(dirblock)
854
self.assertEqual(expected_dirblocks[1:],
855
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
857
def test__walkdirs_utf8(self):
866
self.build_tree(tree)
867
expected_dirblocks = [
869
[('0file', '0file', 'file'),
870
('1dir', '1dir', 'directory'),
871
('2file', '2file', 'file'),
875
[('1dir/0file', '0file', 'file'),
876
('1dir/1dir', '1dir', 'directory'),
879
(('1dir/1dir', './1dir/1dir'),
886
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
887
if len(dirblock) and dirblock[0][1] == '.bzr':
888
# this tests the filtering of selected paths
891
result.append((dirdetail, dirblock))
893
self.assertTrue(found_bzrdir)
894
self.assertEqual(expected_dirblocks,
895
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
896
# you can search a subdir only, with a supplied prefix.
898
for dirblock in osutils.walkdirs('./1dir', '1dir'):
899
result.append(dirblock)
900
self.assertEqual(expected_dirblocks[1:],
901
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
903
def _filter_out_stat(self, result):
904
"""Filter out the stat value from the walkdirs result"""
905
for dirdetail, dirblock in result:
907
for info in dirblock:
908
# Ignore info[3] which is the stat
909
new_dirblock.append((info[0], info[1], info[2], info[4]))
910
dirblock[:] = new_dirblock
912
def _save_platform_info(self):
913
cur_winver = win32utils.winver
914
cur_fs_enc = osutils._fs_enc
915
cur_dir_reader = osutils._selected_dir_reader
917
win32utils.winver = cur_winver
918
osutils._fs_enc = cur_fs_enc
919
osutils._selected_dir_reader = cur_dir_reader
920
self.addCleanup(restore)
922
def assertReadFSDirIs(self, expected):
923
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
924
# Force it to redetect
925
osutils._selected_dir_reader = None
926
# Nothing to list, but should still trigger the selection logic
927
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
928
self.assertIsInstance(osutils._selected_dir_reader, expected)
930
def test_force_walkdirs_utf8_fs_utf8(self):
931
self.requireFeature(UTF8DirReaderFeature)
932
self._save_platform_info()
933
win32utils.winver = None # Avoid the win32 detection code
934
osutils._fs_enc = 'UTF-8'
935
self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
937
def test_force_walkdirs_utf8_fs_ascii(self):
938
self.requireFeature(UTF8DirReaderFeature)
939
self._save_platform_info()
940
win32utils.winver = None # Avoid the win32 detection code
941
osutils._fs_enc = 'US-ASCII'
942
self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
944
def test_force_walkdirs_utf8_fs_ANSI(self):
945
self.requireFeature(UTF8DirReaderFeature)
946
self._save_platform_info()
947
win32utils.winver = None # Avoid the win32 detection code
948
osutils._fs_enc = 'ANSI_X3.4-1968'
949
self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
951
def test_force_walkdirs_utf8_fs_latin1(self):
952
self._save_platform_info()
953
win32utils.winver = None # Avoid the win32 detection code
954
osutils._fs_enc = 'latin1'
955
self.assertReadFSDirIs(osutils.UnicodeDirReader)
957
def test_force_walkdirs_utf8_nt(self):
958
# Disabled because the thunk of the whole walkdirs api is disabled.
959
self.requireFeature(Win32ReadDirFeature)
960
self._save_platform_info()
961
win32utils.winver = 'Windows NT'
962
from bzrlib._walkdirs_win32 import Win32ReadDir
963
self.assertReadFSDirIs(Win32ReadDir)
965
def test_force_walkdirs_utf8_98(self):
966
self.requireFeature(Win32ReadDirFeature)
967
self._save_platform_info()
968
win32utils.winver = 'Windows 98'
969
self.assertReadFSDirIs(osutils.UnicodeDirReader)
971
def test_unicode_walkdirs(self):
972
"""Walkdirs should always return unicode paths."""
973
name0 = u'0file-\xb6'
974
name1 = u'1dir-\u062c\u0648'
975
name2 = u'2file-\u0633'
980
name1 + '/' + name1 + '/',
984
self.build_tree(tree)
986
raise TestSkipped('Could not represent Unicode chars'
987
' in current encoding.')
988
expected_dirblocks = [
990
[(name0, name0, 'file', './' + name0),
991
(name1, name1, 'directory', './' + name1),
992
(name2, name2, 'file', './' + name2),
995
((name1, './' + name1),
996
[(name1 + '/' + name0, name0, 'file', './' + name1
998
(name1 + '/' + name1, name1, 'directory', './' + name1
1002
((name1 + '/' + name1, './' + name1 + '/' + name1),
1007
result = list(osutils.walkdirs('.'))
1008
self._filter_out_stat(result)
1009
self.assertEqual(expected_dirblocks, result)
1010
result = list(osutils.walkdirs(u'./'+name1, name1))
1011
self._filter_out_stat(result)
1012
self.assertEqual(expected_dirblocks[1:], result)
1014
def test_unicode__walkdirs_utf8(self):
1015
"""Walkdirs_utf8 should always return utf8 paths.
1017
The abspath portion might be in unicode or utf-8
1019
name0 = u'0file-\xb6'
1020
name1 = u'1dir-\u062c\u0648'
1021
name2 = u'2file-\u0633'
1025
name1 + '/' + name0,
1026
name1 + '/' + name1 + '/',
1030
self.build_tree(tree)
1031
except UnicodeError:
1032
raise TestSkipped('Could not represent Unicode chars'
1033
' in current encoding.')
1034
name0 = name0.encode('utf8')
1035
name1 = name1.encode('utf8')
1036
name2 = name2.encode('utf8')
1038
expected_dirblocks = [
1040
[(name0, name0, 'file', './' + name0),
1041
(name1, name1, 'directory', './' + name1),
1042
(name2, name2, 'file', './' + name2),
1045
((name1, './' + name1),
1046
[(name1 + '/' + name0, name0, 'file', './' + name1
1048
(name1 + '/' + name1, name1, 'directory', './' + name1
1052
((name1 + '/' + name1, './' + name1 + '/' + name1),
1058
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1059
# all abspaths are Unicode, and encode them back into utf8.
1060
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1061
self.assertIsInstance(dirdetail[0], str)
1062
if isinstance(dirdetail[1], unicode):
1063
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1064
dirblock = [list(info) for info in dirblock]
1065
for info in dirblock:
1066
self.assertIsInstance(info[4], unicode)
1067
info[4] = info[4].encode('utf8')
1069
for info in dirblock:
1070
self.assertIsInstance(info[0], str)
1071
self.assertIsInstance(info[1], str)
1072
self.assertIsInstance(info[4], str)
1073
# Remove the stat information
1074
new_dirblock.append((info[0], info[1], info[2], info[4]))
1075
result.append((dirdetail, new_dirblock))
1076
self.assertEqual(expected_dirblocks, result)
1078
def test__walkdirs_utf8_with_unicode_fs(self):
1079
"""UnicodeDirReader should be a safe fallback everywhere
1081
The abspath portion should be in unicode
1083
# Use the unicode reader. TODO: split into driver-and-driven unit
1085
self._save_platform_info()
1086
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1087
name0u = u'0file-\xb6'
1088
name1u = u'1dir-\u062c\u0648'
1089
name2u = u'2file-\u0633'
1093
name1u + '/' + name0u,
1094
name1u + '/' + name1u + '/',
1098
self.build_tree(tree)
1099
except UnicodeError:
1100
raise TestSkipped('Could not represent Unicode chars'
1101
' in current encoding.')
1102
name0 = name0u.encode('utf8')
1103
name1 = name1u.encode('utf8')
1104
name2 = name2u.encode('utf8')
1106
# All of the abspaths should be in unicode, all of the relative paths
1108
expected_dirblocks = [
1110
[(name0, name0, 'file', './' + name0u),
1111
(name1, name1, 'directory', './' + name1u),
1112
(name2, name2, 'file', './' + name2u),
1115
((name1, './' + name1u),
1116
[(name1 + '/' + name0, name0, 'file', './' + name1u
1118
(name1 + '/' + name1, name1, 'directory', './' + name1u
1122
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1127
result = list(osutils._walkdirs_utf8('.'))
1128
self._filter_out_stat(result)
1129
self.assertEqual(expected_dirblocks, result)
1131
def test__walkdirs_utf8_win32readdir(self):
1132
self.requireFeature(Win32ReadDirFeature)
1133
self.requireFeature(tests.UnicodeFilenameFeature)
1134
from bzrlib._walkdirs_win32 import Win32ReadDir
1135
self._save_platform_info()
1136
osutils._selected_dir_reader = Win32ReadDir()
1137
name0u = u'0file-\xb6'
1138
name1u = u'1dir-\u062c\u0648'
1139
name2u = u'2file-\u0633'
1143
name1u + '/' + name0u,
1144
name1u + '/' + name1u + '/',
1147
self.build_tree(tree)
1148
name0 = name0u.encode('utf8')
1149
name1 = name1u.encode('utf8')
1150
name2 = name2u.encode('utf8')
1152
# All of the abspaths should be in unicode, all of the relative paths
1154
expected_dirblocks = [
1156
[(name0, name0, 'file', './' + name0u),
1157
(name1, name1, 'directory', './' + name1u),
1158
(name2, name2, 'file', './' + name2u),
1161
((name1, './' + name1u),
1162
[(name1 + '/' + name0, name0, 'file', './' + name1u
1164
(name1 + '/' + name1, name1, 'directory', './' + name1u
1168
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1173
result = list(osutils._walkdirs_utf8(u'.'))
1174
self._filter_out_stat(result)
1175
self.assertEqual(expected_dirblocks, result)
1177
def assertStatIsCorrect(self, path, win32stat):
1178
os_stat = os.stat(path)
1179
self.assertEqual(os_stat.st_size, win32stat.st_size)
1180
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1181
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1182
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1183
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1184
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1185
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1187
def test__walkdirs_utf_win32_find_file_stat_file(self):
1188
"""make sure our Stat values are valid"""
1189
self.requireFeature(Win32ReadDirFeature)
1190
self.requireFeature(tests.UnicodeFilenameFeature)
1191
from bzrlib._walkdirs_win32 import Win32ReadDir
1192
name0u = u'0file-\xb6'
1193
name0 = name0u.encode('utf8')
1194
self.build_tree([name0u])
1195
# I hate to sleep() here, but I'm trying to make the ctime different
1198
f = open(name0u, 'ab')
1200
f.write('just a small update')
1204
result = Win32ReadDir().read_dir('', u'.')
1206
self.assertEqual((name0, name0, 'file'), entry[:3])
1207
self.assertEqual(u'./' + name0u, entry[4])
1208
self.assertStatIsCorrect(entry[4], entry[3])
1209
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1211
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1212
"""make sure our Stat values are valid"""
1213
self.requireFeature(Win32ReadDirFeature)
1214
self.requireFeature(tests.UnicodeFilenameFeature)
1215
from bzrlib._walkdirs_win32 import Win32ReadDir
1216
name0u = u'0dir-\u062c\u0648'
1217
name0 = name0u.encode('utf8')
1218
self.build_tree([name0u + '/'])
1220
result = Win32ReadDir().read_dir('', u'.')
1222
self.assertEqual((name0, name0, 'directory'), entry[:3])
1223
self.assertEqual(u'./' + name0u, entry[4])
1224
self.assertStatIsCorrect(entry[4], entry[3])
1226
def assertPathCompare(self, path_less, path_greater):
1227
"""check that path_less and path_greater compare correctly."""
1228
self.assertEqual(0, osutils.compare_paths_prefix_order(
1229
path_less, path_less))
1230
self.assertEqual(0, osutils.compare_paths_prefix_order(
1231
path_greater, path_greater))
1232
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1233
path_less, path_greater))
1234
self.assertEqual(1, osutils.compare_paths_prefix_order(
1235
path_greater, path_less))
1237
def test_compare_paths_prefix_order(self):
1238
# root before all else
1239
self.assertPathCompare("/", "/a")
1240
# alpha within a dir
1241
self.assertPathCompare("/a", "/b")
1242
self.assertPathCompare("/b", "/z")
1243
# high dirs before lower.
1244
self.assertPathCompare("/z", "/a/a")
1245
# except if the deeper dir should be output first
1246
self.assertPathCompare("/a/b/c", "/d/g")
1247
# lexical betwen dirs of the same height
1248
self.assertPathCompare("/a/z", "/z/z")
1249
self.assertPathCompare("/a/c/z", "/a/d/e")
1251
# this should also be consistent for no leading / paths
1252
# root before all else
1253
self.assertPathCompare("", "a")
1254
# alpha within a dir
1255
self.assertPathCompare("a", "b")
1256
self.assertPathCompare("b", "z")
1257
# high dirs before lower.
1258
self.assertPathCompare("z", "a/a")
1259
# except if the deeper dir should be output first
1260
self.assertPathCompare("a/b/c", "d/g")
1261
# lexical betwen dirs of the same height
1262
self.assertPathCompare("a/z", "z/z")
1263
self.assertPathCompare("a/c/z", "a/d/e")
1265
def test_path_prefix_sorting(self):
1266
"""Doing a sort on path prefix should match our sample data."""
1281
dir_sorted_paths = [
1297
sorted(original_paths, key=osutils.path_prefix_key))
1298
# using the comparison routine shoudl work too:
1301
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1304
class TestCopyTree(TestCaseInTempDir):
1306
def test_copy_basic_tree(self):
1307
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1308
osutils.copy_tree('source', 'target')
1309
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1310
self.assertEqual(['c'], os.listdir('target/b'))
1312
def test_copy_tree_target_exists(self):
1313
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1315
osutils.copy_tree('source', 'target')
1316
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1317
self.assertEqual(['c'], os.listdir('target/b'))
1319
def test_copy_tree_symlinks(self):
1320
self.requireFeature(SymlinkFeature)
1321
self.build_tree(['source/'])
1322
os.symlink('a/generic/path', 'source/lnk')
1323
osutils.copy_tree('source', 'target')
1324
self.assertEqual(['lnk'], os.listdir('target'))
1325
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1327
def test_copy_tree_handlers(self):
1328
processed_files = []
1329
processed_links = []
1330
def file_handler(from_path, to_path):
1331
processed_files.append(('f', from_path, to_path))
1332
def dir_handler(from_path, to_path):
1333
processed_files.append(('d', from_path, to_path))
1334
def link_handler(from_path, to_path):
1335
processed_links.append((from_path, to_path))
1336
handlers = {'file':file_handler,
1337
'directory':dir_handler,
1338
'symlink':link_handler,
1341
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1342
if osutils.has_symlinks():
1343
os.symlink('a/generic/path', 'source/lnk')
1344
osutils.copy_tree('source', 'target', handlers=handlers)
1346
self.assertEqual([('d', 'source', 'target'),
1347
('f', 'source/a', 'target/a'),
1348
('d', 'source/b', 'target/b'),
1349
('f', 'source/b/c', 'target/b/c'),
1351
self.failIfExists('target')
1352
if osutils.has_symlinks():
1353
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1356
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
1357
# [bialix] 2006/12/26
1360
class TestSetUnsetEnv(TestCase):
1361
"""Test updating the environment"""
1364
super(TestSetUnsetEnv, self).setUp()
1366
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1367
'Environment was not cleaned up properly.'
1368
' Variable BZR_TEST_ENV_VAR should not exist.')
1370
if 'BZR_TEST_ENV_VAR' in os.environ:
1371
del os.environ['BZR_TEST_ENV_VAR']
1373
self.addCleanup(cleanup)
1376
"""Test that we can set an env variable"""
1377
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1378
self.assertEqual(None, old)
1379
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1381
def test_double_set(self):
1382
"""Test that we get the old value out"""
1383
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1384
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1385
self.assertEqual('foo', old)
1386
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1388
def test_unicode(self):
1389
"""Environment can only contain plain strings
1391
So Unicode strings must be encoded.
1393
uni_val, env_val = probe_unicode_in_user_encoding()
1395
raise TestSkipped('Cannot find a unicode character that works in'
1396
' encoding %s' % (osutils.get_user_encoding(),))
1398
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1399
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1401
def test_unset(self):
1402
"""Test that passing None will remove the env var"""
1403
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1404
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1405
self.assertEqual('foo', old)
1406
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1407
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1410
class TestLocalTimeOffset(TestCase):
1412
def test_local_time_offset(self):
1413
"""Test that local_time_offset() returns a sane value."""
1414
offset = osutils.local_time_offset()
1415
self.assertTrue(isinstance(offset, int))
1416
# Test that the offset is no more than a eighteen hours in
1418
# Time zone handling is system specific, so it is difficult to
1419
# do more specific tests, but a value outside of this range is
1421
eighteen_hours = 18 * 3600
1422
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1424
def test_local_time_offset_with_timestamp(self):
1425
"""Test that local_time_offset() works with a timestamp."""
1426
offset = osutils.local_time_offset(1000000000.1234567)
1427
self.assertTrue(isinstance(offset, int))
1428
eighteen_hours = 18 * 3600
1429
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1432
class TestShaFileByName(TestCaseInTempDir):
1434
def test_sha_empty(self):
1435
self.build_tree_contents([('foo', '')])
1436
expected_sha = osutils.sha_string('')
1437
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1439
def test_sha_mixed_endings(self):
1440
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1441
self.build_tree_contents([('foo', text)])
1442
expected_sha = osutils.sha_string(text)
1443
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1447
r'''# Copyright (C) 2005, 2006 Canonical Ltd
1449
# This program is free software; you can redistribute it and/or modify
1450
# it under the terms of the GNU General Public License as published by
1451
# the Free Software Foundation; either version 2 of the License, or
1452
# (at your option) any later version.
1454
# This program is distributed in the hope that it will be useful,
1455
# but WITHOUT ANY WARRANTY; without even the implied warranty of
1456
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1457
# GNU General Public License for more details.
1459
# You should have received a copy of the GNU General Public License
1460
# along with this program; if not, write to the Free Software
1461
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1464
# NOTE: If update these, please also update the help for global-options in
1465
# bzrlib/help_topics/__init__.py
1468
"""Set of flags that enable different debug behaviour.
1470
These are set with eg ``-Dlock`` on the bzr command line.
1474
* auth - show authentication sections used
1475
* error - show stack traces for all top level exceptions
1476
* evil - capture call sites that do expensive or badly-scaling operations.
1477
* fetch - trace history copying between repositories
1478
* graph - trace graph traversal information
1479
* hashcache - log every time a working file is read to determine its hash
1480
* hooks - trace hook execution
1481
* hpss - trace smart protocol requests and responses
1482
* http - trace http connections, requests and responses
1483
* index - trace major index operations
1484
* knit - trace knit operations
1485
* lock - trace when lockdir locks are taken or released
1486
* merge - emit information for debugging merges
1487
* pack - emit information about pack operations
1493
class TestResourceLoading(TestCaseInTempDir):
1495
def test_resource_string(self):
1496
# test resource in bzrlib
1497
text = osutils.resource_string('bzrlib', 'debug.py')
1498
self.assertEquals(_debug_text, text)
1499
# test resource under bzrlib
1500
text = osutils.resource_string('bzrlib.ui', 'text.py')
1501
self.assertContainsRe(text, "class TextUIFactory")
1502
# test unsupported package
1503
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1505
# test unknown resource
1506
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')