127
208
os.remove('socket')
210
def test_kind_marker(self):
211
self.assertEqual(osutils.kind_marker('file'), '')
212
self.assertEqual(osutils.kind_marker('directory'), '/')
213
self.assertEqual(osutils.kind_marker('symlink'), '@')
214
self.assertEqual(osutils.kind_marker('tree-reference'), '+')
216
def test_get_umask(self):
217
if sys.platform == 'win32':
218
# umask always returns '0', no way to set it
219
self.assertEqual(0, osutils.get_umask())
222
orig_umask = osutils.get_umask()
225
self.assertEqual(0222, osutils.get_umask())
227
self.assertEqual(0022, osutils.get_umask())
229
self.assertEqual(0002, osutils.get_umask())
231
self.assertEqual(0027, osutils.get_umask())
235
def assertFormatedDelta(self, expected, seconds):
236
"""Assert osutils.format_delta formats as expected"""
237
actual = osutils.format_delta(seconds)
238
self.assertEqual(expected, actual)
240
def test_format_delta(self):
241
self.assertFormatedDelta('0 seconds ago', 0)
242
self.assertFormatedDelta('1 second ago', 1)
243
self.assertFormatedDelta('10 seconds ago', 10)
244
self.assertFormatedDelta('59 seconds ago', 59)
245
self.assertFormatedDelta('89 seconds ago', 89)
246
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
247
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
248
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
249
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
250
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
251
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
252
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
253
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
254
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
255
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
256
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
257
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
258
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
259
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
260
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
261
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
262
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
263
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
265
# We handle when time steps the wrong direction because computers
266
# don't have synchronized clocks.
267
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
268
self.assertFormatedDelta('1 second in the future', -1)
269
self.assertFormatedDelta('2 seconds in the future', -2)
271
def test_format_date(self):
272
self.assertRaises(errors.UnsupportedTimezoneFormat,
273
osutils.format_date, 0, timezone='foo')
275
def test_dereference_path(self):
276
self.requireFeature(SymlinkFeature)
277
cwd = osutils.realpath('.')
279
bar_path = osutils.pathjoin(cwd, 'bar')
280
# Using './' to avoid bug #1213894 (first path component not
281
# dereferenced) in Python 2.4.1 and earlier
282
self.assertEqual(bar_path, osutils.realpath('./bar'))
283
os.symlink('bar', 'foo')
284
self.assertEqual(bar_path, osutils.realpath('./foo'))
286
# Does not dereference terminal symlinks
287
foo_path = osutils.pathjoin(cwd, 'foo')
288
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
290
# Dereferences parent symlinks
292
baz_path = osutils.pathjoin(bar_path, 'baz')
293
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
295
# Dereferences parent symlinks that are the first path element
296
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
298
# Dereferences parent symlinks in absolute paths
299
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
300
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
302
def test_changing_access(self):
303
f = file('file', 'w')
307
# Make a file readonly
308
osutils.make_readonly('file')
309
mode = os.lstat('file').st_mode
310
self.assertEqual(mode, mode & 0777555)
312
# Make a file writable
313
osutils.make_writable('file')
314
mode = os.lstat('file').st_mode
315
self.assertEqual(mode, mode | 0200)
317
if osutils.has_symlinks():
318
# should not error when handed a symlink
319
os.symlink('nonexistent', 'dangling')
320
osutils.make_readonly('dangling')
321
osutils.make_writable('dangling')
323
def test_kind_marker(self):
324
self.assertEqual("", osutils.kind_marker("file"))
325
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
326
self.assertEqual("@", osutils.kind_marker("symlink"))
327
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
329
def test_host_os_dereferences_symlinks(self):
330
osutils.host_os_dereferences_symlinks()
333
class TestPumpFile(TestCase):
334
"""Test pumpfile method."""
336
# create a test datablock
337
self.block_size = 512
338
pattern = '0123456789ABCDEF'
339
self.test_data = pattern * (3 * self.block_size / len(pattern))
340
self.test_data_len = len(self.test_data)
342
def test_bracket_block_size(self):
343
"""Read data in blocks with the requested read size bracketing the
345
# make sure test data is larger than max read size
346
self.assertTrue(self.test_data_len > self.block_size)
348
from_file = FakeReadFile(self.test_data)
351
# read (max / 2) bytes and verify read size wasn't affected
352
num_bytes_to_read = self.block_size / 2
353
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
354
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
355
self.assertEqual(from_file.get_read_count(), 1)
357
# read (max) bytes and verify read size wasn't affected
358
num_bytes_to_read = self.block_size
359
from_file.reset_read_count()
360
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
361
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
362
self.assertEqual(from_file.get_read_count(), 1)
364
# read (max + 1) bytes and verify read size was limited
365
num_bytes_to_read = self.block_size + 1
366
from_file.reset_read_count()
367
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
368
self.assertEqual(from_file.get_max_read_size(), self.block_size)
369
self.assertEqual(from_file.get_read_count(), 2)
371
# finish reading the rest of the data
372
num_bytes_to_read = self.test_data_len - to_file.tell()
373
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
375
# report error if the data wasn't equal (we only report the size due
376
# to the length of the data)
377
response_data = to_file.getvalue()
378
if response_data != self.test_data:
379
message = "Data not equal. Expected %d bytes, received %d."
380
self.fail(message % (len(response_data), self.test_data_len))
382
def test_specified_size(self):
383
"""Request a transfer larger than the maximum block size and verify
384
that the maximum read doesn't exceed the block_size."""
385
# make sure test data is larger than max read size
386
self.assertTrue(self.test_data_len > self.block_size)
388
# retrieve data in blocks
389
from_file = FakeReadFile(self.test_data)
391
pumpfile(from_file, to_file, self.test_data_len, self.block_size)
393
# verify read size was equal to the maximum read size
394
self.assertTrue(from_file.get_max_read_size() > 0)
395
self.assertEqual(from_file.get_max_read_size(), self.block_size)
396
self.assertEqual(from_file.get_read_count(), 3)
398
# report error if the data wasn't equal (we only report the size due
399
# to the length of the data)
400
response_data = to_file.getvalue()
401
if response_data != self.test_data:
402
message = "Data not equal. Expected %d bytes, received %d."
403
self.fail(message % (len(response_data), self.test_data_len))
405
def test_to_eof(self):
406
"""Read to end-of-file and verify that the reads are not larger than
407
the maximum read size."""
408
# make sure test data is larger than max read size
409
self.assertTrue(self.test_data_len > self.block_size)
411
# retrieve data to EOF
412
from_file = FakeReadFile(self.test_data)
414
pumpfile(from_file, to_file, -1, self.block_size)
416
# verify read size was equal to the maximum read size
417
self.assertEqual(from_file.get_max_read_size(), self.block_size)
418
self.assertEqual(from_file.get_read_count(), 4)
420
# report error if the data wasn't equal (we only report the size due
421
# to the length of the data)
422
response_data = to_file.getvalue()
423
if response_data != self.test_data:
424
message = "Data not equal. Expected %d bytes, received %d."
425
self.fail(message % (len(response_data), self.test_data_len))
427
def test_defaults(self):
428
"""Verifies that the default arguments will read to EOF -- this
429
test verifies that any existing usages of pumpfile will not be broken
430
with this new version."""
431
# retrieve data using default (old) pumpfile method
432
from_file = FakeReadFile(self.test_data)
434
pumpfile(from_file, to_file)
436
# report error if the data wasn't equal (we only report the size due
437
# to the length of the data)
438
response_data = to_file.getvalue()
439
if response_data != self.test_data:
440
message = "Data not equal. Expected %d bytes, received %d."
441
self.fail(message % (len(response_data), self.test_data_len))
130
443
class TestSafeUnicode(TestCase):
236
733
self.build_tree(tree)
237
734
expected_dirblocks = [
239
('0file', '0file', 'file'),
240
('1dir', '1dir', 'directory'),
241
('2file', '2file', 'file'),
244
('1dir/0file', '0file', 'file'),
245
('1dir/1dir', '1dir', 'directory'),
252
for dirblock in osutils.walkdirs('.'):
253
if len(dirblock) and dirblock[0][1] == '.bzr':
254
# this tests the filtering of selected paths
257
result.append(dirblock)
259
self.assertTrue(found_bzrdir)
260
self.assertEqual(expected_dirblocks,
261
[[line[0:3] for line in block] for block in result])
262
# you can search a subdir only, with a supplied prefix.
264
for dirblock in osutils.walkdirs('1dir', '1dir'):
265
result.append(dirblock)
266
self.assertEqual(expected_dirblocks[1:],
267
[[line[0:3] for line in block] for block in result])
736
[('0file', '0file', 'file'),
737
('1dir', '1dir', 'directory'),
738
('2file', '2file', 'file'),
742
[('1dir/0file', '0file', 'file'),
743
('1dir/1dir', '1dir', 'directory'),
746
(('1dir/1dir', './1dir/1dir'),
753
for dirdetail, dirblock in osutils.walkdirs('.'):
754
if len(dirblock) and dirblock[0][1] == '.bzr':
755
# this tests the filtering of selected paths
758
result.append((dirdetail, dirblock))
760
self.assertTrue(found_bzrdir)
761
self.assertEqual(expected_dirblocks,
762
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
763
# you can search a subdir only, with a supplied prefix.
765
for dirblock in osutils.walkdirs('./1dir', '1dir'):
766
result.append(dirblock)
767
self.assertEqual(expected_dirblocks[1:],
768
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
770
def test__walkdirs_utf8(self):
779
self.build_tree(tree)
780
expected_dirblocks = [
782
[('0file', '0file', 'file'),
783
('1dir', '1dir', 'directory'),
784
('2file', '2file', 'file'),
788
[('1dir/0file', '0file', 'file'),
789
('1dir/1dir', '1dir', 'directory'),
792
(('1dir/1dir', './1dir/1dir'),
799
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
800
if len(dirblock) and dirblock[0][1] == '.bzr':
801
# this tests the filtering of selected paths
804
result.append((dirdetail, dirblock))
806
self.assertTrue(found_bzrdir)
807
self.assertEqual(expected_dirblocks,
808
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
809
# you can search a subdir only, with a supplied prefix.
811
for dirblock in osutils.walkdirs('./1dir', '1dir'):
812
result.append(dirblock)
813
self.assertEqual(expected_dirblocks[1:],
814
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
816
def _filter_out_stat(self, result):
817
"""Filter out the stat value from the walkdirs result"""
818
for dirdetail, dirblock in result:
820
for info in dirblock:
821
# Ignore info[3] which is the stat
822
new_dirblock.append((info[0], info[1], info[2], info[4]))
823
dirblock[:] = new_dirblock
825
def test__walkdirs_utf8_selection(self):
826
# Just trigger the function once, to make sure it has selected a real
828
list(osutils._walkdirs_utf8('.'))
829
if WalkdirsWin32Feature.available():
830
# If the compiled form is available, make sure it is used
831
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
832
self.assertIs(_walkdirs_utf8_win32_find_file,
833
osutils._real_walkdirs_utf8)
834
elif sys.platform == 'win32':
835
self.assertIs(osutils._walkdirs_unicode_to_utf8,
836
osutils._real_walkdirs_utf8)
837
elif osutils._fs_enc.upper() in ('UTF-8', 'ASCII', 'ANSI_X3.4-1968'): # ascii
838
self.assertIs(osutils._walkdirs_fs_utf8,
839
osutils._real_walkdirs_utf8)
841
self.assertIs(osutils._walkdirs_unicode_to_utf8,
842
osutils._real_walkdirs_utf8)
844
def _save_platform_info(self):
845
cur_winver = win32utils.winver
846
cur_fs_enc = osutils._fs_enc
847
cur_real_walkdirs_utf8 = osutils._real_walkdirs_utf8
849
win32utils.winver = cur_winver
850
osutils._fs_enc = cur_fs_enc
851
osutils._real_walkdirs_utf8 = cur_real_walkdirs_utf8
852
self.addCleanup(restore)
854
def assertWalkdirsUtf8Is(self, expected):
855
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
856
# Force it to redetect
857
osutils._real_walkdirs_utf8 = None
858
# Nothing to list, but should still trigger the selection logic
859
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
860
self.assertIs(expected, osutils._real_walkdirs_utf8)
862
def test_force_walkdirs_utf8_fs_utf8(self):
863
self._save_platform_info()
864
win32utils.winver = None # Avoid the win32 detection code
865
osutils._fs_enc = 'UTF-8'
866
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
868
def test_force_walkdirs_utf8_fs_ascii(self):
869
self._save_platform_info()
870
win32utils.winver = None # Avoid the win32 detection code
871
osutils._fs_enc = 'US-ASCII'
872
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
874
def test_force_walkdirs_utf8_fs_ANSI(self):
875
self._save_platform_info()
876
win32utils.winver = None # Avoid the win32 detection code
877
osutils._fs_enc = 'ANSI_X3.4-1968'
878
self.assertWalkdirsUtf8Is(osutils._walkdirs_fs_utf8)
880
def test_force_walkdirs_utf8_fs_latin1(self):
881
self._save_platform_info()
882
win32utils.winver = None # Avoid the win32 detection code
883
osutils._fs_enc = 'latin1'
884
self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
886
def test_force_walkdirs_utf8_nt(self):
887
self.requireFeature(WalkdirsWin32Feature)
888
self._save_platform_info()
889
win32utils.winver = 'Windows NT'
890
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
891
self.assertWalkdirsUtf8Is(_walkdirs_utf8_win32_find_file)
893
def test_force_walkdirs_utf8_nt(self):
894
self.requireFeature(WalkdirsWin32Feature)
895
self._save_platform_info()
896
win32utils.winver = 'Windows 98'
897
self.assertWalkdirsUtf8Is(osutils._walkdirs_unicode_to_utf8)
899
def test_unicode_walkdirs(self):
900
"""Walkdirs should always return unicode paths."""
901
name0 = u'0file-\xb6'
902
name1 = u'1dir-\u062c\u0648'
903
name2 = u'2file-\u0633'
908
name1 + '/' + name1 + '/',
912
self.build_tree(tree)
914
raise TestSkipped('Could not represent Unicode chars'
915
' in current encoding.')
916
expected_dirblocks = [
918
[(name0, name0, 'file', './' + name0),
919
(name1, name1, 'directory', './' + name1),
920
(name2, name2, 'file', './' + name2),
923
((name1, './' + name1),
924
[(name1 + '/' + name0, name0, 'file', './' + name1
926
(name1 + '/' + name1, name1, 'directory', './' + name1
930
((name1 + '/' + name1, './' + name1 + '/' + name1),
935
result = list(osutils.walkdirs('.'))
936
self._filter_out_stat(result)
937
self.assertEqual(expected_dirblocks, result)
938
result = list(osutils.walkdirs(u'./'+name1, name1))
939
self._filter_out_stat(result)
940
self.assertEqual(expected_dirblocks[1:], result)
942
def test_unicode__walkdirs_utf8(self):
943
"""Walkdirs_utf8 should always return utf8 paths.
945
The abspath portion might be in unicode or utf-8
947
name0 = u'0file-\xb6'
948
name1 = u'1dir-\u062c\u0648'
949
name2 = u'2file-\u0633'
954
name1 + '/' + name1 + '/',
958
self.build_tree(tree)
960
raise TestSkipped('Could not represent Unicode chars'
961
' in current encoding.')
962
name0 = name0.encode('utf8')
963
name1 = name1.encode('utf8')
964
name2 = name2.encode('utf8')
966
expected_dirblocks = [
968
[(name0, name0, 'file', './' + name0),
969
(name1, name1, 'directory', './' + name1),
970
(name2, name2, 'file', './' + name2),
973
((name1, './' + name1),
974
[(name1 + '/' + name0, name0, 'file', './' + name1
976
(name1 + '/' + name1, name1, 'directory', './' + name1
980
((name1 + '/' + name1, './' + name1 + '/' + name1),
986
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
987
# all abspaths are Unicode, and encode them back into utf8.
988
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
989
self.assertIsInstance(dirdetail[0], str)
990
if isinstance(dirdetail[1], unicode):
991
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
992
dirblock = [list(info) for info in dirblock]
993
for info in dirblock:
994
self.assertIsInstance(info[4], unicode)
995
info[4] = info[4].encode('utf8')
997
for info in dirblock:
998
self.assertIsInstance(info[0], str)
999
self.assertIsInstance(info[1], str)
1000
self.assertIsInstance(info[4], str)
1001
# Remove the stat information
1002
new_dirblock.append((info[0], info[1], info[2], info[4]))
1003
result.append((dirdetail, new_dirblock))
1004
self.assertEqual(expected_dirblocks, result)
1006
def test_unicode__walkdirs_unicode_to_utf8(self):
1007
"""walkdirs_unicode_to_utf8 should be a safe fallback everywhere
1009
The abspath portion should be in unicode
1011
name0u = u'0file-\xb6'
1012
name1u = u'1dir-\u062c\u0648'
1013
name2u = u'2file-\u0633'
1017
name1u + '/' + name0u,
1018
name1u + '/' + name1u + '/',
1022
self.build_tree(tree)
1023
except UnicodeError:
1024
raise TestSkipped('Could not represent Unicode chars'
1025
' in current encoding.')
1026
name0 = name0u.encode('utf8')
1027
name1 = name1u.encode('utf8')
1028
name2 = name2u.encode('utf8')
1030
# All of the abspaths should be in unicode, all of the relative paths
1032
expected_dirblocks = [
1034
[(name0, name0, 'file', './' + name0u),
1035
(name1, name1, 'directory', './' + name1u),
1036
(name2, name2, 'file', './' + name2u),
1039
((name1, './' + name1u),
1040
[(name1 + '/' + name0, name0, 'file', './' + name1u
1042
(name1 + '/' + name1, name1, 'directory', './' + name1u
1046
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1051
result = list(osutils._walkdirs_unicode_to_utf8('.'))
1052
self._filter_out_stat(result)
1053
self.assertEqual(expected_dirblocks, result)
1055
def test__walkdirs_utf_win32_find_file(self):
1056
self.requireFeature(WalkdirsWin32Feature)
1057
self.requireFeature(tests.UnicodeFilenameFeature)
1058
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1059
name0u = u'0file-\xb6'
1060
name1u = u'1dir-\u062c\u0648'
1061
name2u = u'2file-\u0633'
1065
name1u + '/' + name0u,
1066
name1u + '/' + name1u + '/',
1069
self.build_tree(tree)
1070
name0 = name0u.encode('utf8')
1071
name1 = name1u.encode('utf8')
1072
name2 = name2u.encode('utf8')
1074
# All of the abspaths should be in unicode, all of the relative paths
1076
expected_dirblocks = [
1078
[(name0, name0, 'file', './' + name0u),
1079
(name1, name1, 'directory', './' + name1u),
1080
(name2, name2, 'file', './' + name2u),
1083
((name1, './' + name1u),
1084
[(name1 + '/' + name0, name0, 'file', './' + name1u
1086
(name1 + '/' + name1, name1, 'directory', './' + name1u
1090
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1095
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1096
self._filter_out_stat(result)
1097
self.assertEqual(expected_dirblocks, result)
1099
def assertStatIsCorrect(self, path, win32stat):
1100
os_stat = os.stat(path)
1101
self.assertEqual(os_stat.st_size, win32stat.st_size)
1102
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1103
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1104
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1105
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1106
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1107
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1109
def test__walkdirs_utf_win32_find_file_stat_file(self):
1110
"""make sure our Stat values are valid"""
1111
self.requireFeature(WalkdirsWin32Feature)
1112
self.requireFeature(tests.UnicodeFilenameFeature)
1113
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1114
name0u = u'0file-\xb6'
1115
name0 = name0u.encode('utf8')
1116
self.build_tree([name0u])
1117
# I hate to sleep() here, but I'm trying to make the ctime different
1120
f = open(name0u, 'ab')
1122
f.write('just a small update')
1126
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1127
entry = result[0][1][0]
1128
self.assertEqual((name0, name0, 'file'), entry[:3])
1129
self.assertEqual(u'./' + name0u, entry[4])
1130
self.assertStatIsCorrect(entry[4], entry[3])
1131
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1133
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1134
"""make sure our Stat values are valid"""
1135
self.requireFeature(WalkdirsWin32Feature)
1136
self.requireFeature(tests.UnicodeFilenameFeature)
1137
from bzrlib._walkdirs_win32 import _walkdirs_utf8_win32_find_file
1138
name0u = u'0dir-\u062c\u0648'
1139
name0 = name0u.encode('utf8')
1140
self.build_tree([name0u + '/'])
1142
result = list(_walkdirs_utf8_win32_find_file(u'.'))
1143
entry = result[0][1][0]
1144
self.assertEqual((name0, name0, 'directory'), entry[:3])
1145
self.assertEqual(u'./' + name0u, entry[4])
1146
self.assertStatIsCorrect(entry[4], entry[3])
1148
def assertPathCompare(self, path_less, path_greater):
1149
"""check that path_less and path_greater compare correctly."""
1150
self.assertEqual(0, osutils.compare_paths_prefix_order(
1151
path_less, path_less))
1152
self.assertEqual(0, osutils.compare_paths_prefix_order(
1153
path_greater, path_greater))
1154
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1155
path_less, path_greater))
1156
self.assertEqual(1, osutils.compare_paths_prefix_order(
1157
path_greater, path_less))
1159
def test_compare_paths_prefix_order(self):
1160
# root before all else
1161
self.assertPathCompare("/", "/a")
1162
# alpha within a dir
1163
self.assertPathCompare("/a", "/b")
1164
self.assertPathCompare("/b", "/z")
1165
# high dirs before lower.
1166
self.assertPathCompare("/z", "/a/a")
1167
# except if the deeper dir should be output first
1168
self.assertPathCompare("/a/b/c", "/d/g")
1169
# lexical betwen dirs of the same height
1170
self.assertPathCompare("/a/z", "/z/z")
1171
self.assertPathCompare("/a/c/z", "/a/d/e")
1173
# this should also be consistent for no leading / paths
1174
# root before all else
1175
self.assertPathCompare("", "a")
1176
# alpha within a dir
1177
self.assertPathCompare("a", "b")
1178
self.assertPathCompare("b", "z")
1179
# high dirs before lower.
1180
self.assertPathCompare("z", "a/a")
1181
# except if the deeper dir should be output first
1182
self.assertPathCompare("a/b/c", "d/g")
1183
# lexical betwen dirs of the same height
1184
self.assertPathCompare("a/z", "z/z")
1185
self.assertPathCompare("a/c/z", "a/d/e")
1187
def test_path_prefix_sorting(self):
1188
"""Doing a sort on path prefix should match our sample data."""
1203
dir_sorted_paths = [
1219
sorted(original_paths, key=osutils.path_prefix_key))
1220
# using the comparison routine shoudl work too:
1223
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1226
class TestCopyTree(TestCaseInTempDir):
1228
def test_copy_basic_tree(self):
1229
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1230
osutils.copy_tree('source', 'target')
1231
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1232
self.assertEqual(['c'], os.listdir('target/b'))
1234
def test_copy_tree_target_exists(self):
1235
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1237
osutils.copy_tree('source', 'target')
1238
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1239
self.assertEqual(['c'], os.listdir('target/b'))
1241
def test_copy_tree_symlinks(self):
1242
self.requireFeature(SymlinkFeature)
1243
self.build_tree(['source/'])
1244
os.symlink('a/generic/path', 'source/lnk')
1245
osutils.copy_tree('source', 'target')
1246
self.assertEqual(['lnk'], os.listdir('target'))
1247
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1249
def test_copy_tree_handlers(self):
1250
processed_files = []
1251
processed_links = []
1252
def file_handler(from_path, to_path):
1253
processed_files.append(('f', from_path, to_path))
1254
def dir_handler(from_path, to_path):
1255
processed_files.append(('d', from_path, to_path))
1256
def link_handler(from_path, to_path):
1257
processed_links.append((from_path, to_path))
1258
handlers = {'file':file_handler,
1259
'directory':dir_handler,
1260
'symlink':link_handler,
1263
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1264
if osutils.has_symlinks():
1265
os.symlink('a/generic/path', 'source/lnk')
1266
osutils.copy_tree('source', 'target', handlers=handlers)
1268
self.assertEqual([('d', 'source', 'target'),
1269
('f', 'source/a', 'target/a'),
1270
('d', 'source/b', 'target/b'),
1271
('f', 'source/b/c', 'target/b/c'),
1273
self.failIfExists('target')
1274
if osutils.has_symlinks():
1275
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1278
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
1279
# [bialix] 2006/12/26
1282
class TestSetUnsetEnv(TestCase):
1283
"""Test updating the environment"""
1286
super(TestSetUnsetEnv, self).setUp()
1288
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1289
'Environment was not cleaned up properly.'
1290
' Variable BZR_TEST_ENV_VAR should not exist.')
1292
if 'BZR_TEST_ENV_VAR' in os.environ:
1293
del os.environ['BZR_TEST_ENV_VAR']
1295
self.addCleanup(cleanup)
1298
"""Test that we can set an env variable"""
1299
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1300
self.assertEqual(None, old)
1301
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1303
def test_double_set(self):
1304
"""Test that we get the old value out"""
1305
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1306
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1307
self.assertEqual('foo', old)
1308
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1310
def test_unicode(self):
1311
"""Environment can only contain plain strings
1313
So Unicode strings must be encoded.
1315
uni_val, env_val = probe_unicode_in_user_encoding()
1317
raise TestSkipped('Cannot find a unicode character that works in'
1318
' encoding %s' % (bzrlib.user_encoding,))
1320
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1321
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1323
def test_unset(self):
1324
"""Test that passing None will remove the env var"""
1325
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1326
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1327
self.assertEqual('foo', old)
1328
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1329
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1332
class TestLocalTimeOffset(TestCase):
1334
def test_local_time_offset(self):
1335
"""Test that local_time_offset() returns a sane value."""
1336
offset = osutils.local_time_offset()
1337
self.assertTrue(isinstance(offset, int))
1338
# Test that the offset is no more than a eighteen hours in
1340
# Time zone handling is system specific, so it is difficult to
1341
# do more specific tests, but a value outside of this range is
1343
eighteen_hours = 18 * 3600
1344
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1346
def test_local_time_offset_with_timestamp(self):
1347
"""Test that local_time_offset() works with a timestamp."""
1348
offset = osutils.local_time_offset(1000000000.1234567)
1349
self.assertTrue(isinstance(offset, int))
1350
eighteen_hours = 18 * 3600
1351
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1354
class TestShaFileByName(TestCaseInTempDir):
1356
def test_sha_empty(self):
1357
self.build_tree_contents([('foo', '')])
1358
expected_sha = osutils.sha_string('')
1359
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1361
def test_sha_mixed_endings(self):
1362
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1363
self.build_tree_contents([('foo', text)])
1364
expected_sha = osutils.sha_string(text)
1365
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1369
r'''# Copyright (C) 2005, 2006 Canonical Ltd
1371
# This program is free software; you can redistribute it and/or modify
1372
# it under the terms of the GNU General Public License as published by
1373
# the Free Software Foundation; either version 2 of the License, or
1374
# (at your option) any later version.
1376
# This program is distributed in the hope that it will be useful,
1377
# but WITHOUT ANY WARRANTY; without even the implied warranty of
1378
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1379
# GNU General Public License for more details.
1381
# You should have received a copy of the GNU General Public License
1382
# along with this program; if not, write to the Free Software
1383
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1386
# NOTE: If update these, please also update the help for global-options in
1387
# bzrlib/help_topics/__init__.py
1390
"""Set of flags that enable different debug behaviour.
1392
These are set with eg ``-Dlock`` on the bzr command line.
1396
* auth - show authentication sections used
1397
* error - show stack traces for all top level exceptions
1398
* evil - capture call sites that do expensive or badly-scaling operations.
1399
* fetch - trace history copying between repositories
1400
* graph - trace graph traversal information
1401
* hashcache - log every time a working file is read to determine its hash
1402
* hooks - trace hook execution
1403
* hpss - trace smart protocol requests and responses
1404
* http - trace http connections, requests and responses
1405
* index - trace major index operations
1406
* knit - trace knit operations
1407
* lock - trace when lockdir locks are taken or released
1408
* merge - emit information for debugging merges
1409
* pack - emit information about pack operations
1415
class TestResourceLoading(TestCaseInTempDir):
1417
def test_resource_string(self):
1418
# test resource in bzrlib
1419
text = osutils.resource_string('bzrlib', 'debug.py')
1420
self.assertEquals(_debug_text, text)
1421
# test resource under bzrlib
1422
text = osutils.resource_string('bzrlib.ui', 'text.py')
1423
self.assertContainsRe(text, "class TextUIFactory")
1424
# test unsupported package
1425
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1427
# test unknown resource
1428
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')