72
182
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
75
class TestSafeUnicode(TestCase):
185
class TestIsInside(tests.TestCase):
187
def test_is_inside(self):
188
is_inside = osutils.is_inside
189
self.assertTrue(is_inside('src', 'src/foo.c'))
190
self.assertFalse(is_inside('src', 'srccontrol'))
191
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
192
self.assertTrue(is_inside('foo.c', 'foo.c'))
193
self.assertFalse(is_inside('foo.c', ''))
194
self.assertTrue(is_inside('', 'foo.c'))
196
def test_is_inside_any(self):
197
SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
198
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
199
(['src'], SRC_FOO_C),
202
self.assert_(osutils.is_inside_any(dirs, fn))
203
for dirs, fn in [(['src'], 'srccontrol'),
204
(['src'], 'srccontrol/foo')]:
205
self.assertFalse(osutils.is_inside_any(dirs, fn))
207
def test_is_inside_or_parent_of_any(self):
208
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
209
(['src'], 'src/foo.c'),
210
(['src/bar.c'], 'src'),
211
(['src/bar.c', 'bla/foo.c'], 'src'),
214
self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
216
for dirs, fn in [(['src'], 'srccontrol'),
217
(['srccontrol/foo.c'], 'src'),
218
(['src'], 'srccontrol/foo')]:
219
self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
222
class TestRmTree(tests.TestCaseInTempDir):
224
def test_rmtree(self):
225
# Check to remove tree with read-only files/dirs
227
f = file('dir/file', 'w')
230
# would like to also try making the directory readonly, but at the
231
# moment python shutil.rmtree doesn't handle that properly - it would
232
# need to chmod the directory before removing things inside it - deferred
233
# for now -- mbp 20060505
234
# osutils.make_readonly('dir')
235
osutils.make_readonly('dir/file')
237
osutils.rmtree('dir')
239
self.failIfExists('dir/file')
240
self.failIfExists('dir')
243
class TestDeleteAny(tests.TestCaseInTempDir):
245
def test_delete_any_readonly(self):
246
# from <https://bugs.launchpad.net/bzr/+bug/218206>
247
self.build_tree(['d/', 'f'])
248
osutils.make_readonly('d')
249
osutils.make_readonly('f')
251
osutils.delete_any('f')
252
osutils.delete_any('d')
255
class TestKind(tests.TestCaseInTempDir):
257
def test_file_kind(self):
258
self.build_tree(['file', 'dir/'])
259
self.assertEquals('file', osutils.file_kind('file'))
260
self.assertEquals('directory', osutils.file_kind('dir/'))
261
if osutils.has_symlinks():
262
os.symlink('symlink', 'symlink')
263
self.assertEquals('symlink', osutils.file_kind('symlink'))
265
# TODO: jam 20060529 Test a block device
267
os.lstat('/dev/null')
269
if e.errno not in (errno.ENOENT,):
272
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
274
mkfifo = getattr(os, 'mkfifo', None)
278
self.assertEquals('fifo', osutils.file_kind('fifo'))
282
AF_UNIX = getattr(socket, 'AF_UNIX', None)
284
s = socket.socket(AF_UNIX)
287
self.assertEquals('socket', osutils.file_kind('socket'))
291
def test_kind_marker(self):
292
self.assertEqual("", osutils.kind_marker("file"))
293
self.assertEqual("/", osutils.kind_marker('directory'))
294
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
295
self.assertEqual("@", osutils.kind_marker("symlink"))
296
self.assertEqual("+", osutils.kind_marker("tree-reference"))
297
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
300
class TestUmask(tests.TestCaseInTempDir):
302
def test_get_umask(self):
303
if sys.platform == 'win32':
304
# umask always returns '0', no way to set it
305
self.assertEqual(0, osutils.get_umask())
308
orig_umask = osutils.get_umask()
309
self.addCleanup(os.umask, orig_umask)
311
self.assertEqual(0222, osutils.get_umask())
313
self.assertEqual(0022, osutils.get_umask())
315
self.assertEqual(0002, osutils.get_umask())
317
self.assertEqual(0027, osutils.get_umask())
320
class TestDateTime(tests.TestCase):
322
def assertFormatedDelta(self, expected, seconds):
323
"""Assert osutils.format_delta formats as expected"""
324
actual = osutils.format_delta(seconds)
325
self.assertEqual(expected, actual)
327
def test_format_delta(self):
328
self.assertFormatedDelta('0 seconds ago', 0)
329
self.assertFormatedDelta('1 second ago', 1)
330
self.assertFormatedDelta('10 seconds ago', 10)
331
self.assertFormatedDelta('59 seconds ago', 59)
332
self.assertFormatedDelta('89 seconds ago', 89)
333
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
334
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
335
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
336
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
337
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
338
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
339
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
340
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
341
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
342
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
343
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
344
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
345
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
346
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
347
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
348
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
349
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
350
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
352
# We handle when time steps the wrong direction because computers
353
# don't have synchronized clocks.
354
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
355
self.assertFormatedDelta('1 second in the future', -1)
356
self.assertFormatedDelta('2 seconds in the future', -2)
358
def test_format_date(self):
359
self.assertRaises(errors.UnsupportedTimezoneFormat,
360
osutils.format_date, 0, timezone='foo')
361
self.assertIsInstance(osutils.format_date(0), str)
362
self.assertIsInstance(osutils.format_local_date(0), unicode)
363
# Testing for the actual value of the local weekday without
364
# duplicating the code from format_date is difficult.
365
# Instead blackbox.test_locale should check for localized
366
# dates once they do occur in output strings.
368
def test_local_time_offset(self):
369
"""Test that local_time_offset() returns a sane value."""
370
offset = osutils.local_time_offset()
371
self.assertTrue(isinstance(offset, int))
372
# Test that the offset is no more than a eighteen hours in
374
# Time zone handling is system specific, so it is difficult to
375
# do more specific tests, but a value outside of this range is
377
eighteen_hours = 18 * 3600
378
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
380
def test_local_time_offset_with_timestamp(self):
381
"""Test that local_time_offset() works with a timestamp."""
382
offset = osutils.local_time_offset(1000000000.1234567)
383
self.assertTrue(isinstance(offset, int))
384
eighteen_hours = 18 * 3600
385
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
388
class TestLinks(tests.TestCaseInTempDir):
390
def test_dereference_path(self):
391
self.requireFeature(tests.SymlinkFeature)
392
cwd = osutils.realpath('.')
394
bar_path = osutils.pathjoin(cwd, 'bar')
395
# Using './' to avoid bug #1213894 (first path component not
396
# dereferenced) in Python 2.4.1 and earlier
397
self.assertEqual(bar_path, osutils.realpath('./bar'))
398
os.symlink('bar', 'foo')
399
self.assertEqual(bar_path, osutils.realpath('./foo'))
401
# Does not dereference terminal symlinks
402
foo_path = osutils.pathjoin(cwd, 'foo')
403
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
405
# Dereferences parent symlinks
407
baz_path = osutils.pathjoin(bar_path, 'baz')
408
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
410
# Dereferences parent symlinks that are the first path element
411
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
413
# Dereferences parent symlinks in absolute paths
414
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
415
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
417
def test_changing_access(self):
418
f = file('file', 'w')
422
# Make a file readonly
423
osutils.make_readonly('file')
424
mode = os.lstat('file').st_mode
425
self.assertEqual(mode, mode & 0777555)
427
# Make a file writable
428
osutils.make_writable('file')
429
mode = os.lstat('file').st_mode
430
self.assertEqual(mode, mode | 0200)
432
if osutils.has_symlinks():
433
# should not error when handed a symlink
434
os.symlink('nonexistent', 'dangling')
435
osutils.make_readonly('dangling')
436
osutils.make_writable('dangling')
438
def test_host_os_dereferences_symlinks(self):
439
osutils.host_os_dereferences_symlinks()
442
class TestCanonicalRelPath(tests.TestCaseInTempDir):
444
_test_needs_features = [tests.CaseInsCasePresFilenameFeature]
446
def test_canonical_relpath_simple(self):
447
f = file('MixedCaseName', 'w')
449
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
450
real_base_dir = osutils.realpath(self.test_base_dir)
451
actual = osutils.canonical_relpath(real_base_dir, 'mixedcasename')
452
self.failUnlessEqual('work/MixedCaseName', actual)
454
def test_canonical_relpath_missing_tail(self):
455
os.mkdir('MixedCaseParent')
456
# Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
457
real_base_dir = osutils.realpath(self.test_base_dir)
458
actual = osutils.canonical_relpath(real_base_dir,
459
'mixedcaseparent/nochild')
460
self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
463
class TestPumpFile(tests.TestCase):
464
"""Test pumpfile method."""
467
tests.TestCase.setUp(self)
468
# create a test datablock
469
self.block_size = 512
470
pattern = '0123456789ABCDEF'
471
self.test_data = pattern * (3 * self.block_size / len(pattern))
472
self.test_data_len = len(self.test_data)
474
def test_bracket_block_size(self):
475
"""Read data in blocks with the requested read size bracketing the
477
# make sure test data is larger than max read size
478
self.assertTrue(self.test_data_len > self.block_size)
480
from_file = file_utils.FakeReadFile(self.test_data)
483
# read (max / 2) bytes and verify read size wasn't affected
484
num_bytes_to_read = self.block_size / 2
485
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
486
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
487
self.assertEqual(from_file.get_read_count(), 1)
489
# read (max) bytes and verify read size wasn't affected
490
num_bytes_to_read = self.block_size
491
from_file.reset_read_count()
492
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
493
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
494
self.assertEqual(from_file.get_read_count(), 1)
496
# read (max + 1) bytes and verify read size was limited
497
num_bytes_to_read = self.block_size + 1
498
from_file.reset_read_count()
499
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
500
self.assertEqual(from_file.get_max_read_size(), self.block_size)
501
self.assertEqual(from_file.get_read_count(), 2)
503
# finish reading the rest of the data
504
num_bytes_to_read = self.test_data_len - to_file.tell()
505
osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
507
# report error if the data wasn't equal (we only report the size due
508
# to the length of the data)
509
response_data = to_file.getvalue()
510
if response_data != self.test_data:
511
message = "Data not equal. Expected %d bytes, received %d."
512
self.fail(message % (len(response_data), self.test_data_len))
514
def test_specified_size(self):
515
"""Request a transfer larger than the maximum block size and verify
516
that the maximum read doesn't exceed the block_size."""
517
# make sure test data is larger than max read size
518
self.assertTrue(self.test_data_len > self.block_size)
520
# retrieve data in blocks
521
from_file = file_utils.FakeReadFile(self.test_data)
523
osutils.pumpfile(from_file, to_file, self.test_data_len,
526
# verify read size was equal to the maximum read size
527
self.assertTrue(from_file.get_max_read_size() > 0)
528
self.assertEqual(from_file.get_max_read_size(), self.block_size)
529
self.assertEqual(from_file.get_read_count(), 3)
531
# report error if the data wasn't equal (we only report the size due
532
# to the length of the data)
533
response_data = to_file.getvalue()
534
if response_data != self.test_data:
535
message = "Data not equal. Expected %d bytes, received %d."
536
self.fail(message % (len(response_data), self.test_data_len))
538
def test_to_eof(self):
539
"""Read to end-of-file and verify that the reads are not larger than
540
the maximum read size."""
541
# make sure test data is larger than max read size
542
self.assertTrue(self.test_data_len > self.block_size)
544
# retrieve data to EOF
545
from_file = file_utils.FakeReadFile(self.test_data)
547
osutils.pumpfile(from_file, to_file, -1, self.block_size)
549
# verify read size was equal to the maximum read size
550
self.assertEqual(from_file.get_max_read_size(), self.block_size)
551
self.assertEqual(from_file.get_read_count(), 4)
553
# report error if the data wasn't equal (we only report the size due
554
# to the length of the data)
555
response_data = to_file.getvalue()
556
if response_data != self.test_data:
557
message = "Data not equal. Expected %d bytes, received %d."
558
self.fail(message % (len(response_data), self.test_data_len))
560
def test_defaults(self):
561
"""Verifies that the default arguments will read to EOF -- this
562
test verifies that any existing usages of pumpfile will not be broken
563
with this new version."""
564
# retrieve data using default (old) pumpfile method
565
from_file = file_utils.FakeReadFile(self.test_data)
567
osutils.pumpfile(from_file, to_file)
569
# report error if the data wasn't equal (we only report the size due
570
# to the length of the data)
571
response_data = to_file.getvalue()
572
if response_data != self.test_data:
573
message = "Data not equal. Expected %d bytes, received %d."
574
self.fail(message % (len(response_data), self.test_data_len))
576
def test_report_activity(self):
578
def log_activity(length, direction):
579
activity.append((length, direction))
580
from_file = StringIO(self.test_data)
582
osutils.pumpfile(from_file, to_file, buff_size=500,
583
report_activity=log_activity, direction='read')
584
self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
585
(36, 'read')], activity)
587
from_file = StringIO(self.test_data)
590
osutils.pumpfile(from_file, to_file, buff_size=500,
591
report_activity=log_activity, direction='write')
592
self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
593
(36, 'write')], activity)
595
# And with a limited amount of data
596
from_file = StringIO(self.test_data)
599
osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
600
report_activity=log_activity, direction='read')
601
self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
605
class TestPumpStringFile(tests.TestCase):
607
def test_empty(self):
609
osutils.pump_string_file("", output)
610
self.assertEqual("", output.getvalue())
612
def test_more_than_segment_size(self):
614
osutils.pump_string_file("123456789", output, 2)
615
self.assertEqual("123456789", output.getvalue())
617
def test_segment_size(self):
619
osutils.pump_string_file("12", output, 2)
620
self.assertEqual("12", output.getvalue())
622
def test_segment_size_multiple(self):
624
osutils.pump_string_file("1234", output, 2)
625
self.assertEqual("1234", output.getvalue())
628
class TestRelpath(tests.TestCase):
630
def test_simple_relpath(self):
631
cwd = osutils.getcwd()
632
subdir = cwd + '/subdir'
633
self.assertEqual('subdir', osutils.relpath(cwd, subdir))
635
def test_deep_relpath(self):
636
cwd = osutils.getcwd()
637
subdir = cwd + '/sub/subsubdir'
638
self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
640
def test_not_relative(self):
641
self.assertRaises(errors.PathNotChild,
642
osutils.relpath, 'C:/path', 'H:/path')
643
self.assertRaises(errors.PathNotChild,
644
osutils.relpath, 'C:/', 'H:/path')
647
class TestSafeUnicode(tests.TestCase):
77
649
def test_from_ascii_string(self):
78
650
self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
87
659
self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
89
661
def test_bad_utf8_string(self):
90
self.assertRaises(BzrBadParameterNotUnicode,
662
self.assertRaises(errors.BzrBadParameterNotUnicode,
91
663
osutils.safe_unicode,
667
class TestSafeUtf8(tests.TestCase):
669
def test_from_ascii_string(self):
671
self.assertEqual('foobar', osutils.safe_utf8(f))
673
def test_from_unicode_string_ascii_contents(self):
674
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
676
def test_from_unicode_string_unicode_contents(self):
677
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
679
def test_from_utf8_string(self):
680
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
682
def test_bad_utf8_string(self):
683
self.assertRaises(errors.BzrBadParameterNotUnicode,
684
osutils.safe_utf8, '\xbb\xbb')
687
class TestSafeRevisionId(tests.TestCase):
689
def test_from_ascii_string(self):
690
# this shouldn't give a warning because it's getting an ascii string
691
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
693
def test_from_unicode_string_ascii_contents(self):
694
self.assertEqual('bargam',
695
osutils.safe_revision_id(u'bargam', warn=False))
697
def test_from_unicode_deprecated(self):
698
self.assertEqual('bargam',
699
self.callDeprecated([osutils._revision_id_warning],
700
osutils.safe_revision_id, u'bargam'))
702
def test_from_unicode_string_unicode_contents(self):
703
self.assertEqual('bargam\xc2\xae',
704
osutils.safe_revision_id(u'bargam\xae', warn=False))
706
def test_from_utf8_string(self):
707
self.assertEqual('foo\xc2\xae',
708
osutils.safe_revision_id('foo\xc2\xae'))
711
"""Currently, None is a valid revision_id"""
712
self.assertEqual(None, osutils.safe_revision_id(None))
715
class TestSafeFileId(tests.TestCase):
717
def test_from_ascii_string(self):
718
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
720
def test_from_unicode_string_ascii_contents(self):
721
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
723
def test_from_unicode_deprecated(self):
724
self.assertEqual('bargam',
725
self.callDeprecated([osutils._file_id_warning],
726
osutils.safe_file_id, u'bargam'))
728
def test_from_unicode_string_unicode_contents(self):
729
self.assertEqual('bargam\xc2\xae',
730
osutils.safe_file_id(u'bargam\xae', warn=False))
732
def test_from_utf8_string(self):
733
self.assertEqual('foo\xc2\xae',
734
osutils.safe_file_id('foo\xc2\xae'))
737
"""Currently, None is a valid revision_id"""
738
self.assertEqual(None, osutils.safe_file_id(None))
741
class TestWin32Funcs(tests.TestCase):
742
"""Test that _win32 versions of os utilities return appropriate paths."""
744
def test_abspath(self):
745
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
746
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
747
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
748
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
750
def test_realpath(self):
751
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
752
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
754
def test_pathjoin(self):
755
self.assertEqual('path/to/foo',
756
osutils._win32_pathjoin('path', 'to', 'foo'))
757
self.assertEqual('C:/foo',
758
osutils._win32_pathjoin('path\\to', 'C:\\foo'))
759
self.assertEqual('C:/foo',
760
osutils._win32_pathjoin('path/to', 'C:/foo'))
761
self.assertEqual('path/to/foo',
762
osutils._win32_pathjoin('path/to/', 'foo'))
763
self.assertEqual('/foo',
764
osutils._win32_pathjoin('C:/path/to/', '/foo'))
765
self.assertEqual('/foo',
766
osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
768
def test_normpath(self):
769
self.assertEqual('path/to/foo',
770
osutils._win32_normpath(r'path\\from\..\to\.\foo'))
771
self.assertEqual('path/to/foo',
772
osutils._win32_normpath('path//from/../to/./foo'))
774
def test_getcwd(self):
775
cwd = osutils._win32_getcwd()
776
os_cwd = os.getcwdu()
777
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
778
# win32 is inconsistent whether it returns lower or upper case
779
# and even if it was consistent the user might type the other
780
# so we force it to uppercase
781
# running python.exe under cmd.exe return capital C:\\
782
# running win32 python inside a cygwin shell returns lowercase
783
self.assertEqual(os_cwd[0].upper(), cwd[0])
785
def test_fixdrive(self):
786
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
787
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
788
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
790
def test_win98_abspath(self):
792
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
793
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
795
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
796
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
798
cwd = osutils.getcwd().rstrip('/')
799
drive = osutils._nt_splitdrive(cwd)[0]
800
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
801
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
804
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
807
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
808
"""Test win32 functions that create files."""
810
def test_getcwd(self):
811
self.requireFeature(tests.UnicodeFilenameFeature)
814
# TODO: jam 20060427 This will probably fail on Mac OSX because
815
# it will change the normalization of B\xe5gfors
816
# Consider using a different unicode character, or make
817
# osutils.getcwd() renormalize the path.
818
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
820
def test_minimum_path_selection(self):
821
self.assertEqual(set(),
822
osutils.minimum_path_selection([]))
823
self.assertEqual(set(['a']),
824
osutils.minimum_path_selection(['a']))
825
self.assertEqual(set(['a', 'b']),
826
osutils.minimum_path_selection(['a', 'b']))
827
self.assertEqual(set(['a/', 'b']),
828
osutils.minimum_path_selection(['a/', 'b']))
829
self.assertEqual(set(['a/', 'b']),
830
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
831
self.assertEqual(set(['a-b', 'a', 'a0b']),
832
osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
834
def test_mkdtemp(self):
835
tmpdir = osutils._win32_mkdtemp(dir='.')
836
self.assertFalse('\\' in tmpdir)
838
def test_rename(self):
846
osutils._win32_rename('b', 'a')
847
self.failUnlessExists('a')
848
self.failIfExists('b')
849
self.assertFileEqual('baz\n', 'a')
851
def test_rename_missing_file(self):
857
osutils._win32_rename('b', 'a')
858
except (IOError, OSError), e:
859
self.assertEqual(errno.ENOENT, e.errno)
860
self.assertFileEqual('foo\n', 'a')
862
def test_rename_missing_dir(self):
865
osutils._win32_rename('b', 'a')
866
except (IOError, OSError), e:
867
self.assertEqual(errno.ENOENT, e.errno)
869
def test_rename_current_dir(self):
872
# You can't rename the working directory
873
# doing rename non-existant . usually
874
# just raises ENOENT, since non-existant
877
osutils._win32_rename('b', '.')
878
except (IOError, OSError), e:
879
self.assertEqual(errno.ENOENT, e.errno)
881
def test_splitpath(self):
882
def check(expected, path):
883
self.assertEqual(expected, osutils.splitpath(path))
886
check(['a', 'b'], 'a/b')
887
check(['a', 'b'], 'a/./b')
888
check(['a', '.b'], 'a/.b')
889
check(['a', '.b'], 'a\\.b')
891
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
894
class TestParentDirectories(tests.TestCaseInTempDir):
895
"""Test osutils.parent_directories()"""
897
def test_parent_directories(self):
898
self.assertEqual([], osutils.parent_directories('a'))
899
self.assertEqual(['a'], osutils.parent_directories('a/b'))
900
self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
903
class TestMacFuncsDirs(tests.TestCaseInTempDir):
904
"""Test mac special functions that require directories."""
906
def test_getcwd(self):
907
self.requireFeature(tests.UnicodeFilenameFeature)
908
os.mkdir(u'B\xe5gfors')
909
os.chdir(u'B\xe5gfors')
910
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
912
def test_getcwd_nonnorm(self):
913
self.requireFeature(tests.UnicodeFilenameFeature)
914
# Test that _mac_getcwd() will normalize this path
915
os.mkdir(u'Ba\u030agfors')
916
os.chdir(u'Ba\u030agfors')
917
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
920
class TestChunksToLines(tests.TestCase):
922
def test_smoketest(self):
923
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
924
osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
925
self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
926
osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
928
def test_osutils_binding(self):
929
from bzrlib.tests import test__chunks_to_lines
930
if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
931
from bzrlib._chunks_to_lines_pyx import chunks_to_lines
933
from bzrlib._chunks_to_lines_py import chunks_to_lines
934
self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
937
class TestSplitLines(tests.TestCase):
939
def test_split_unicode(self):
940
self.assertEqual([u'foo\n', u'bar\xae'],
941
osutils.split_lines(u'foo\nbar\xae'))
942
self.assertEqual([u'foo\n', u'bar\xae\n'],
943
osutils.split_lines(u'foo\nbar\xae\n'))
945
def test_split_with_carriage_returns(self):
946
self.assertEqual(['foo\rbar\n'],
947
osutils.split_lines('foo\rbar\n'))
950
class TestWalkDirs(tests.TestCaseInTempDir):
952
def assertExpectedBlocks(self, expected, result):
953
self.assertEqual(expected,
954
[(dirinfo, [line[0:3] for line in block])
955
for dirinfo, block in result])
957
def test_walkdirs(self):
966
self.build_tree(tree)
967
expected_dirblocks = [
969
[('0file', '0file', 'file'),
970
('1dir', '1dir', 'directory'),
971
('2file', '2file', 'file'),
975
[('1dir/0file', '0file', 'file'),
976
('1dir/1dir', '1dir', 'directory'),
979
(('1dir/1dir', './1dir/1dir'),
986
for dirdetail, dirblock in osutils.walkdirs('.'):
987
if len(dirblock) and dirblock[0][1] == '.bzr':
988
# this tests the filtering of selected paths
991
result.append((dirdetail, dirblock))
993
self.assertTrue(found_bzrdir)
994
self.assertExpectedBlocks(expected_dirblocks, result)
995
# you can search a subdir only, with a supplied prefix.
997
for dirblock in osutils.walkdirs('./1dir', '1dir'):
998
result.append(dirblock)
999
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1001
def test_walkdirs_os_error(self):
1002
# <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1003
# Pyrex readdir didn't raise useful messages if it had an error
1004
# reading the directory
1005
if sys.platform == 'win32':
1006
raise tests.TestNotApplicable(
1007
"readdir IOError not tested on win32")
1008
os.mkdir("test-unreadable")
1009
os.chmod("test-unreadable", 0000)
1010
# must chmod it back so that it can be removed
1011
self.addCleanup(os.chmod, "test-unreadable", 0700)
1012
# The error is not raised until the generator is actually evaluated.
1013
# (It would be ok if it happened earlier but at the moment it
1015
e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1016
self.assertEquals('./test-unreadable', e.filename)
1017
self.assertEquals(errno.EACCES, e.errno)
1018
# Ensure the message contains the file name
1019
self.assertContainsRe(str(e), "\./test-unreadable")
1021
def test__walkdirs_utf8(self):
1030
self.build_tree(tree)
1031
expected_dirblocks = [
1033
[('0file', '0file', 'file'),
1034
('1dir', '1dir', 'directory'),
1035
('2file', '2file', 'file'),
1038
(('1dir', './1dir'),
1039
[('1dir/0file', '0file', 'file'),
1040
('1dir/1dir', '1dir', 'directory'),
1043
(('1dir/1dir', './1dir/1dir'),
1049
found_bzrdir = False
1050
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1051
if len(dirblock) and dirblock[0][1] == '.bzr':
1052
# this tests the filtering of selected paths
1055
result.append((dirdetail, dirblock))
1057
self.assertTrue(found_bzrdir)
1058
self.assertExpectedBlocks(expected_dirblocks, result)
1060
# you can search a subdir only, with a supplied prefix.
1062
for dirblock in osutils.walkdirs('./1dir', '1dir'):
1063
result.append(dirblock)
1064
self.assertExpectedBlocks(expected_dirblocks[1:], result)
1066
def _filter_out_stat(self, result):
1067
"""Filter out the stat value from the walkdirs result"""
1068
for dirdetail, dirblock in result:
1070
for info in dirblock:
1071
# Ignore info[3] which is the stat
1072
new_dirblock.append((info[0], info[1], info[2], info[4]))
1073
dirblock[:] = new_dirblock
1075
def _save_platform_info(self):
1076
cur_winver = win32utils.winver
1077
cur_fs_enc = osutils._fs_enc
1078
cur_dir_reader = osutils._selected_dir_reader
1080
win32utils.winver = cur_winver
1081
osutils._fs_enc = cur_fs_enc
1082
osutils._selected_dir_reader = cur_dir_reader
1083
self.addCleanup(restore)
1085
def assertDirReaderIs(self, expected):
1086
"""Assert the right implementation for _walkdirs_utf8 is chosen."""
1087
# Force it to redetect
1088
osutils._selected_dir_reader = None
1089
# Nothing to list, but should still trigger the selection logic
1090
self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1091
self.assertIsInstance(osutils._selected_dir_reader, expected)
1093
def test_force_walkdirs_utf8_fs_utf8(self):
1094
self.requireFeature(UTF8DirReaderFeature)
1095
self._save_platform_info()
1096
win32utils.winver = None # Avoid the win32 detection code
1097
osutils._fs_enc = 'UTF-8'
1098
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1100
def test_force_walkdirs_utf8_fs_ascii(self):
1101
self.requireFeature(UTF8DirReaderFeature)
1102
self._save_platform_info()
1103
win32utils.winver = None # Avoid the win32 detection code
1104
osutils._fs_enc = 'US-ASCII'
1105
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1107
def test_force_walkdirs_utf8_fs_ANSI(self):
1108
self.requireFeature(UTF8DirReaderFeature)
1109
self._save_platform_info()
1110
win32utils.winver = None # Avoid the win32 detection code
1111
osutils._fs_enc = 'ANSI_X3.4-1968'
1112
self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1114
def test_force_walkdirs_utf8_fs_latin1(self):
1115
self._save_platform_info()
1116
win32utils.winver = None # Avoid the win32 detection code
1117
osutils._fs_enc = 'latin1'
1118
self.assertDirReaderIs(osutils.UnicodeDirReader)
1120
def test_force_walkdirs_utf8_nt(self):
1121
# Disabled because the thunk of the whole walkdirs api is disabled.
1122
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1123
self._save_platform_info()
1124
win32utils.winver = 'Windows NT'
1125
from bzrlib._walkdirs_win32 import Win32ReadDir
1126
self.assertDirReaderIs(Win32ReadDir)
1128
def test_force_walkdirs_utf8_98(self):
1129
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1130
self._save_platform_info()
1131
win32utils.winver = 'Windows 98'
1132
self.assertDirReaderIs(osutils.UnicodeDirReader)
1134
def test_unicode_walkdirs(self):
1135
"""Walkdirs should always return unicode paths."""
1136
self.requireFeature(tests.UnicodeFilenameFeature)
1137
name0 = u'0file-\xb6'
1138
name1 = u'1dir-\u062c\u0648'
1139
name2 = u'2file-\u0633'
1143
name1 + '/' + name0,
1144
name1 + '/' + name1 + '/',
1147
self.build_tree(tree)
1148
expected_dirblocks = [
1150
[(name0, name0, 'file', './' + name0),
1151
(name1, name1, 'directory', './' + name1),
1152
(name2, name2, 'file', './' + name2),
1155
((name1, './' + name1),
1156
[(name1 + '/' + name0, name0, 'file', './' + name1
1158
(name1 + '/' + name1, name1, 'directory', './' + name1
1162
((name1 + '/' + name1, './' + name1 + '/' + name1),
1167
result = list(osutils.walkdirs('.'))
1168
self._filter_out_stat(result)
1169
self.assertEqual(expected_dirblocks, result)
1170
result = list(osutils.walkdirs(u'./'+name1, name1))
1171
self._filter_out_stat(result)
1172
self.assertEqual(expected_dirblocks[1:], result)
1174
def test_unicode__walkdirs_utf8(self):
1175
"""Walkdirs_utf8 should always return utf8 paths.
1177
The abspath portion might be in unicode or utf-8
1179
self.requireFeature(tests.UnicodeFilenameFeature)
1180
name0 = u'0file-\xb6'
1181
name1 = u'1dir-\u062c\u0648'
1182
name2 = u'2file-\u0633'
1186
name1 + '/' + name0,
1187
name1 + '/' + name1 + '/',
1190
self.build_tree(tree)
1191
name0 = name0.encode('utf8')
1192
name1 = name1.encode('utf8')
1193
name2 = name2.encode('utf8')
1195
expected_dirblocks = [
1197
[(name0, name0, 'file', './' + name0),
1198
(name1, name1, 'directory', './' + name1),
1199
(name2, name2, 'file', './' + name2),
1202
((name1, './' + name1),
1203
[(name1 + '/' + name0, name0, 'file', './' + name1
1205
(name1 + '/' + name1, name1, 'directory', './' + name1
1209
((name1 + '/' + name1, './' + name1 + '/' + name1),
1215
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
1216
# all abspaths are Unicode, and encode them back into utf8.
1217
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
1218
self.assertIsInstance(dirdetail[0], str)
1219
if isinstance(dirdetail[1], unicode):
1220
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
1221
dirblock = [list(info) for info in dirblock]
1222
for info in dirblock:
1223
self.assertIsInstance(info[4], unicode)
1224
info[4] = info[4].encode('utf8')
1226
for info in dirblock:
1227
self.assertIsInstance(info[0], str)
1228
self.assertIsInstance(info[1], str)
1229
self.assertIsInstance(info[4], str)
1230
# Remove the stat information
1231
new_dirblock.append((info[0], info[1], info[2], info[4]))
1232
result.append((dirdetail, new_dirblock))
1233
self.assertEqual(expected_dirblocks, result)
1235
def test__walkdirs_utf8_with_unicode_fs(self):
1236
"""UnicodeDirReader should be a safe fallback everywhere
1238
The abspath portion should be in unicode
1240
self.requireFeature(tests.UnicodeFilenameFeature)
1241
# Use the unicode reader. TODO: split into driver-and-driven unit
1243
self._save_platform_info()
1244
osutils._selected_dir_reader = osutils.UnicodeDirReader()
1245
name0u = u'0file-\xb6'
1246
name1u = u'1dir-\u062c\u0648'
1247
name2u = u'2file-\u0633'
1251
name1u + '/' + name0u,
1252
name1u + '/' + name1u + '/',
1255
self.build_tree(tree)
1256
name0 = name0u.encode('utf8')
1257
name1 = name1u.encode('utf8')
1258
name2 = name2u.encode('utf8')
1260
# All of the abspaths should be in unicode, all of the relative paths
1262
expected_dirblocks = [
1264
[(name0, name0, 'file', './' + name0u),
1265
(name1, name1, 'directory', './' + name1u),
1266
(name2, name2, 'file', './' + name2u),
1269
((name1, './' + name1u),
1270
[(name1 + '/' + name0, name0, 'file', './' + name1u
1272
(name1 + '/' + name1, name1, 'directory', './' + name1u
1276
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1281
result = list(osutils._walkdirs_utf8('.'))
1282
self._filter_out_stat(result)
1283
self.assertEqual(expected_dirblocks, result)
1285
def test__walkdirs_utf8_win32readdir(self):
1286
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1287
self.requireFeature(tests.UnicodeFilenameFeature)
1288
from bzrlib._walkdirs_win32 import Win32ReadDir
1289
self._save_platform_info()
1290
osutils._selected_dir_reader = Win32ReadDir()
1291
name0u = u'0file-\xb6'
1292
name1u = u'1dir-\u062c\u0648'
1293
name2u = u'2file-\u0633'
1297
name1u + '/' + name0u,
1298
name1u + '/' + name1u + '/',
1301
self.build_tree(tree)
1302
name0 = name0u.encode('utf8')
1303
name1 = name1u.encode('utf8')
1304
name2 = name2u.encode('utf8')
1306
# All of the abspaths should be in unicode, all of the relative paths
1308
expected_dirblocks = [
1310
[(name0, name0, 'file', './' + name0u),
1311
(name1, name1, 'directory', './' + name1u),
1312
(name2, name2, 'file', './' + name2u),
1315
((name1, './' + name1u),
1316
[(name1 + '/' + name0, name0, 'file', './' + name1u
1318
(name1 + '/' + name1, name1, 'directory', './' + name1u
1322
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1327
result = list(osutils._walkdirs_utf8(u'.'))
1328
self._filter_out_stat(result)
1329
self.assertEqual(expected_dirblocks, result)
1331
def assertStatIsCorrect(self, path, win32stat):
1332
os_stat = os.stat(path)
1333
self.assertEqual(os_stat.st_size, win32stat.st_size)
1334
self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1335
self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1336
self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1337
self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1338
self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1339
self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1341
def test__walkdirs_utf_win32_find_file_stat_file(self):
1342
"""make sure our Stat values are valid"""
1343
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1344
self.requireFeature(tests.UnicodeFilenameFeature)
1345
from bzrlib._walkdirs_win32 import Win32ReadDir
1346
name0u = u'0file-\xb6'
1347
name0 = name0u.encode('utf8')
1348
self.build_tree([name0u])
1349
# I hate to sleep() here, but I'm trying to make the ctime different
1352
f = open(name0u, 'ab')
1354
f.write('just a small update')
1358
result = Win32ReadDir().read_dir('', u'.')
1360
self.assertEqual((name0, name0, 'file'), entry[:3])
1361
self.assertEqual(u'./' + name0u, entry[4])
1362
self.assertStatIsCorrect(entry[4], entry[3])
1363
self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1365
def test__walkdirs_utf_win32_find_file_stat_directory(self):
1366
"""make sure our Stat values are valid"""
1367
self.requireFeature(test__walkdirs_win32.Win32ReadDirFeature)
1368
self.requireFeature(tests.UnicodeFilenameFeature)
1369
from bzrlib._walkdirs_win32 import Win32ReadDir
1370
name0u = u'0dir-\u062c\u0648'
1371
name0 = name0u.encode('utf8')
1372
self.build_tree([name0u + '/'])
1374
result = Win32ReadDir().read_dir('', u'.')
1376
self.assertEqual((name0, name0, 'directory'), entry[:3])
1377
self.assertEqual(u'./' + name0u, entry[4])
1378
self.assertStatIsCorrect(entry[4], entry[3])
1380
def assertPathCompare(self, path_less, path_greater):
1381
"""check that path_less and path_greater compare correctly."""
1382
self.assertEqual(0, osutils.compare_paths_prefix_order(
1383
path_less, path_less))
1384
self.assertEqual(0, osutils.compare_paths_prefix_order(
1385
path_greater, path_greater))
1386
self.assertEqual(-1, osutils.compare_paths_prefix_order(
1387
path_less, path_greater))
1388
self.assertEqual(1, osutils.compare_paths_prefix_order(
1389
path_greater, path_less))
1391
def test_compare_paths_prefix_order(self):
1392
# root before all else
1393
self.assertPathCompare("/", "/a")
1394
# alpha within a dir
1395
self.assertPathCompare("/a", "/b")
1396
self.assertPathCompare("/b", "/z")
1397
# high dirs before lower.
1398
self.assertPathCompare("/z", "/a/a")
1399
# except if the deeper dir should be output first
1400
self.assertPathCompare("/a/b/c", "/d/g")
1401
# lexical betwen dirs of the same height
1402
self.assertPathCompare("/a/z", "/z/z")
1403
self.assertPathCompare("/a/c/z", "/a/d/e")
1405
# this should also be consistent for no leading / paths
1406
# root before all else
1407
self.assertPathCompare("", "a")
1408
# alpha within a dir
1409
self.assertPathCompare("a", "b")
1410
self.assertPathCompare("b", "z")
1411
# high dirs before lower.
1412
self.assertPathCompare("z", "a/a")
1413
# except if the deeper dir should be output first
1414
self.assertPathCompare("a/b/c", "d/g")
1415
# lexical betwen dirs of the same height
1416
self.assertPathCompare("a/z", "z/z")
1417
self.assertPathCompare("a/c/z", "a/d/e")
1419
def test_path_prefix_sorting(self):
1420
"""Doing a sort on path prefix should match our sample data."""
1435
dir_sorted_paths = [
1451
sorted(original_paths, key=osutils.path_prefix_key))
1452
# using the comparison routine shoudl work too:
1455
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1458
class TestCopyTree(tests.TestCaseInTempDir):
1460
def test_copy_basic_tree(self):
1461
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1462
osutils.copy_tree('source', 'target')
1463
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1464
self.assertEqual(['c'], os.listdir('target/b'))
1466
def test_copy_tree_target_exists(self):
1467
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1469
osutils.copy_tree('source', 'target')
1470
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1471
self.assertEqual(['c'], os.listdir('target/b'))
1473
def test_copy_tree_symlinks(self):
1474
self.requireFeature(tests.SymlinkFeature)
1475
self.build_tree(['source/'])
1476
os.symlink('a/generic/path', 'source/lnk')
1477
osutils.copy_tree('source', 'target')
1478
self.assertEqual(['lnk'], os.listdir('target'))
1479
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1481
def test_copy_tree_handlers(self):
1482
processed_files = []
1483
processed_links = []
1484
def file_handler(from_path, to_path):
1485
processed_files.append(('f', from_path, to_path))
1486
def dir_handler(from_path, to_path):
1487
processed_files.append(('d', from_path, to_path))
1488
def link_handler(from_path, to_path):
1489
processed_links.append((from_path, to_path))
1490
handlers = {'file':file_handler,
1491
'directory':dir_handler,
1492
'symlink':link_handler,
1495
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1496
if osutils.has_symlinks():
1497
os.symlink('a/generic/path', 'source/lnk')
1498
osutils.copy_tree('source', 'target', handlers=handlers)
1500
self.assertEqual([('d', 'source', 'target'),
1501
('f', 'source/a', 'target/a'),
1502
('d', 'source/b', 'target/b'),
1503
('f', 'source/b/c', 'target/b/c'),
1505
self.failIfExists('target')
1506
if osutils.has_symlinks():
1507
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1510
class TestSetUnsetEnv(tests.TestCase):
1511
"""Test updating the environment"""
1514
super(TestSetUnsetEnv, self).setUp()
1516
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1517
'Environment was not cleaned up properly.'
1518
' Variable BZR_TEST_ENV_VAR should not exist.')
1520
if 'BZR_TEST_ENV_VAR' in os.environ:
1521
del os.environ['BZR_TEST_ENV_VAR']
1523
self.addCleanup(cleanup)
1526
"""Test that we can set an env variable"""
1527
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1528
self.assertEqual(None, old)
1529
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1531
def test_double_set(self):
1532
"""Test that we get the old value out"""
1533
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1534
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1535
self.assertEqual('foo', old)
1536
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1538
def test_unicode(self):
1539
"""Environment can only contain plain strings
1541
So Unicode strings must be encoded.
1543
uni_val, env_val = tests.probe_unicode_in_user_encoding()
1545
raise tests.TestSkipped(
1546
'Cannot find a unicode character that works in encoding %s'
1547
% (osutils.get_user_encoding(),))
1549
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1550
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1552
def test_unset(self):
1553
"""Test that passing None will remove the env var"""
1554
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1555
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1556
self.assertEqual('foo', old)
1557
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1558
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1561
class TestSizeShaFile(tests.TestCaseInTempDir):
1563
def test_sha_empty(self):
1564
self.build_tree_contents([('foo', '')])
1565
expected_sha = osutils.sha_string('')
1567
self.addCleanup(f.close)
1568
size, sha = osutils.size_sha_file(f)
1569
self.assertEqual(0, size)
1570
self.assertEqual(expected_sha, sha)
1572
def test_sha_mixed_endings(self):
1573
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1574
self.build_tree_contents([('foo', text)])
1575
expected_sha = osutils.sha_string(text)
1577
self.addCleanup(f.close)
1578
size, sha = osutils.size_sha_file(f)
1579
self.assertEqual(38, size)
1580
self.assertEqual(expected_sha, sha)
1583
class TestShaFileByName(tests.TestCaseInTempDir):
1585
def test_sha_empty(self):
1586
self.build_tree_contents([('foo', '')])
1587
expected_sha = osutils.sha_string('')
1588
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1590
def test_sha_mixed_endings(self):
1591
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1592
self.build_tree_contents([('foo', text)])
1593
expected_sha = osutils.sha_string(text)
1594
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1597
class TestResourceLoading(tests.TestCaseInTempDir):
1599
def test_resource_string(self):
1600
# test resource in bzrlib
1601
text = osutils.resource_string('bzrlib', 'debug.py')
1602
self.assertContainsRe(text, "debug_flags = set()")
1603
# test resource under bzrlib
1604
text = osutils.resource_string('bzrlib.ui', 'text.py')
1605
self.assertContainsRe(text, "class TextUIFactory")
1606
# test unsupported package
1607
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1609
# test unknown resource
1610
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1613
class TestReCompile(tests.TestCase):
1615
def test_re_compile_checked(self):
1616
r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1617
self.assertTrue(r.match('aaaa'))
1618
self.assertTrue(r.match('aAaA'))
1620
def test_re_compile_checked_error(self):
1621
# like https://bugs.launchpad.net/bzr/+bug/251352
1622
err = self.assertRaises(
1623
errors.BzrCommandError,
1624
osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1626
"Invalid regular expression in test case: '*': "
1627
"nothing to repeat",
1631
class TestDirReader(tests.TestCaseInTempDir):
1634
_dir_reader_class = None
1635
_native_to_unicode = None
1638
tests.TestCaseInTempDir.setUp(self)
1640
# Save platform specific info and reset it
1641
cur_dir_reader = osutils._selected_dir_reader
1644
osutils._selected_dir_reader = cur_dir_reader
1645
self.addCleanup(restore)
1647
osutils._selected_dir_reader = self._dir_reader_class()
1649
def _get_ascii_tree(self):
1657
expected_dirblocks = [
1659
[('0file', '0file', 'file'),
1660
('1dir', '1dir', 'directory'),
1661
('2file', '2file', 'file'),
1664
(('1dir', './1dir'),
1665
[('1dir/0file', '0file', 'file'),
1666
('1dir/1dir', '1dir', 'directory'),
1669
(('1dir/1dir', './1dir/1dir'),
1674
return tree, expected_dirblocks
1676
def test_walk_cur_dir(self):
1677
tree, expected_dirblocks = self._get_ascii_tree()
1678
self.build_tree(tree)
1679
result = list(osutils._walkdirs_utf8('.'))
1680
# Filter out stat and abspath
1681
self.assertEqual(expected_dirblocks,
1682
[(dirinfo, [line[0:3] for line in block])
1683
for dirinfo, block in result])
1685
def test_walk_sub_dir(self):
1686
tree, expected_dirblocks = self._get_ascii_tree()
1687
self.build_tree(tree)
1688
# you can search a subdir only, with a supplied prefix.
1689
result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1690
# Filter out stat and abspath
1691
self.assertEqual(expected_dirblocks[1:],
1692
[(dirinfo, [line[0:3] for line in block])
1693
for dirinfo, block in result])
1695
def _get_unicode_tree(self):
1696
name0u = u'0file-\xb6'
1697
name1u = u'1dir-\u062c\u0648'
1698
name2u = u'2file-\u0633'
1702
name1u + '/' + name0u,
1703
name1u + '/' + name1u + '/',
1706
name0 = name0u.encode('UTF-8')
1707
name1 = name1u.encode('UTF-8')
1708
name2 = name2u.encode('UTF-8')
1709
expected_dirblocks = [
1711
[(name0, name0, 'file', './' + name0u),
1712
(name1, name1, 'directory', './' + name1u),
1713
(name2, name2, 'file', './' + name2u),
1716
((name1, './' + name1u),
1717
[(name1 + '/' + name0, name0, 'file', './' + name1u
1719
(name1 + '/' + name1, name1, 'directory', './' + name1u
1723
((name1 + '/' + name1, './' + name1u + '/' + name1u),
1728
return tree, expected_dirblocks
1730
def _filter_out(self, raw_dirblocks):
1731
"""Filter out a walkdirs_utf8 result.
1733
stat field is removed, all native paths are converted to unicode
1735
filtered_dirblocks = []
1736
for dirinfo, block in raw_dirblocks:
1737
dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1740
details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1741
filtered_dirblocks.append((dirinfo, details))
1742
return filtered_dirblocks
1744
def test_walk_unicode_tree(self):
1745
self.requireFeature(tests.UnicodeFilenameFeature)
1746
tree, expected_dirblocks = self._get_unicode_tree()
1747
self.build_tree(tree)
1748
result = list(osutils._walkdirs_utf8('.'))
1749
self.assertEqual(expected_dirblocks, self._filter_out(result))
1751
def test_symlink(self):
1752
self.requireFeature(tests.SymlinkFeature)
1753
self.requireFeature(tests.UnicodeFilenameFeature)
1754
target = u'target\N{Euro Sign}'
1755
link_name = u'l\N{Euro Sign}nk'
1756
os.symlink(target, link_name)
1757
target_utf8 = target.encode('UTF-8')
1758
link_name_utf8 = link_name.encode('UTF-8')
1759
expected_dirblocks = [
1761
[(link_name_utf8, link_name_utf8,
1762
'symlink', './' + link_name),],
1764
result = list(osutils._walkdirs_utf8('.'))
1765
self.assertEqual(expected_dirblocks, self._filter_out(result))
1768
class TestReadLink(tests.TestCaseInTempDir):
1769
"""Exposes os.readlink() problems and the osutils solution.
1771
The only guarantee offered by os.readlink(), starting with 2.6, is that a
1772
unicode string will be returned if a unicode string is passed.
1774
But prior python versions failed to properly encode the passed unicode
1777
_test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1780
super(tests.TestCaseInTempDir, self).setUp()
1781
self.link = u'l\N{Euro Sign}ink'
1782
self.target = u'targe\N{Euro Sign}t'
1783
os.symlink(self.target, self.link)
1785
def test_os_readlink_link_encoding(self):
1786
if sys.version_info < (2, 6):
1787
self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1789
self.assertEquals(self.target, os.readlink(self.link))
1791
def test_os_readlink_link_decoding(self):
1792
self.assertEquals(self.target.encode(osutils._fs_enc),
1793
os.readlink(self.link.encode(osutils._fs_enc)))
1796
class TestConcurrency(tests.TestCase):
1798
def test_local_concurrency(self):
1799
concurrency = osutils.local_concurrency()
1800
self.assertIsInstance(concurrency, int)