~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Vincent Ladeuil
  • Date: 2007-06-06 13:52:02 UTC
  • mto: (2485.8.44 bzr.connection.sharing)
  • mto: This revision was merged to the branch mainline in revision 2646.
  • Revision ID: v.ladeuil+lp@free.fr-20070606135202-mqhxcv6z57uce434
Fix merge multiple connections. Test suite *not* passing (sftp
refactoring pending but unrelated to merge).

* bzrlib/builtins.py:
(cmd_merge.run): Fix the multiple connections bug by reusing the
tramsport used to check for a bundle and keep all other used
transports in possible_transports.
(_merge_helper): Add a possible_transports parameter for
reuse.

* bzrlib/transport/__init__.py:
(Transport._reuse_for): By default, Transports are not reusable.
(ConnectedTransport._reuse_for): ConnectedTransports are reusable
under certain conditions.
(_urlRE): Fix misleading group name.
(_try_transport_factories): Moved after get_transport (another use
case for moved lines). The do_catching_redirections was
incorrectly inserted between get_transport and
_try_transport_factories.

* bzrlib/tests/test_transport.py:
(TestReusedTransports.test_reuse_same_transport)
(TestReusedTransports.test_don_t_reuse_different_transport): Add
more tests.

* bzrlib/merge.py:
(_get_tree, Merger.set_other): Add a possible_transports parameter
for reuse.

* bzrlib/bzrdir.py:
(BzrDir.open_containing): Add a possible_transports parameter for
reuse.

* bzrlib/branch.py:
(Branch.open_containing): Add a possible_transports parameter for
reuse.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
20
19
import errno
21
20
import os
22
21
import socket
23
22
import stat
24
23
import sys
25
 
import time
26
24
 
 
25
import bzrlib
27
26
from bzrlib import (
28
27
    errors,
29
28
    osutils,
30
 
    tests,
31
29
    win32utils,
32
30
    )
33
31
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
34
 
from bzrlib.osutils import (
35
 
        is_inside_any,
36
 
        is_inside_or_parent_of_any,
37
 
        pathjoin,
38
 
        pumpfile,
39
 
        pump_string_file,
40
 
        canonical_relpath,
41
 
        )
42
32
from bzrlib.tests import (
43
 
        adapt_tests,
44
 
        Feature,
45
 
        probe_unicode_in_user_encoding,
46
 
        split_suite_by_re,
47
33
        StringIOWrapper,
48
 
        SymlinkFeature,
49
 
        CaseInsCasePresFilenameFeature,
50
 
        TestCase,
51
 
        TestCaseInTempDir,
52
 
        TestScenarioApplier,
 
34
        TestCase, 
 
35
        TestCaseInTempDir, 
53
36
        TestSkipped,
54
37
        )
55
 
from bzrlib.tests.file_utils import (
56
 
    FakeReadFile,
57
 
    )
58
 
from bzrlib.tests.test__walkdirs_win32 import Win32ReadDirFeature
59
 
 
60
 
 
61
 
class _UTF8DirReaderFeature(Feature):
62
 
 
63
 
    def _probe(self):
64
 
        try:
65
 
            from bzrlib import _readdir_pyx
66
 
            self.reader = _readdir_pyx.UTF8DirReader
67
 
            return True
68
 
        except ImportError:
69
 
            return False
70
 
 
71
 
    def feature_name(self):
72
 
        return 'bzrlib._readdir_pyx'
73
 
 
74
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
75
38
 
76
39
 
77
40
class TestOSUtils(TestCaseInTempDir):
123
86
 
124
87
    # TODO: test fancy_rename using a MemoryTransport
125
88
 
126
 
    def test_rename_change_case(self):
127
 
        # on Windows we should be able to change filename case by rename
128
 
        self.build_tree(['a', 'b/'])
129
 
        osutils.rename('a', 'A')
130
 
        osutils.rename('b', 'B')
131
 
        # we can't use failUnlessExists on case-insensitive filesystem
132
 
        # so try to check shape of the tree
133
 
        shape = sorted(os.listdir('.'))
134
 
        self.assertEquals(['A', 'B'], shape)
135
 
 
136
89
    def test_01_rand_chars_empty(self):
137
90
        result = osutils.rand_chars(0)
138
91
        self.assertEqual(result, '')
152
105
        self.assertFalse(is_inside('foo.c', ''))
153
106
        self.assertTrue(is_inside('', 'foo.c'))
154
107
 
155
 
    def test_is_inside_any(self):
156
 
        SRC_FOO_C = pathjoin('src', 'foo.c')
157
 
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
158
 
                         (['src'], SRC_FOO_C),
159
 
                         (['src'], 'src'),
160
 
                         ]:
161
 
            self.assert_(is_inside_any(dirs, fn))
162
 
        for dirs, fn in [(['src'], 'srccontrol'),
163
 
                         (['src'], 'srccontrol/foo')]:
164
 
            self.assertFalse(is_inside_any(dirs, fn))
165
 
 
166
 
    def test_is_inside_or_parent_of_any(self):
167
 
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
168
 
                         (['src'], 'src/foo.c'),
169
 
                         (['src/bar.c'], 'src'),
170
 
                         (['src/bar.c', 'bla/foo.c'], 'src'),
171
 
                         (['src'], 'src'),
172
 
                         ]:
173
 
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
174
 
            
175
 
        for dirs, fn in [(['src'], 'srccontrol'),
176
 
                         (['srccontrol/foo.c'], 'src'),
177
 
                         (['src'], 'srccontrol/foo')]:
178
 
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
179
 
 
180
108
    def test_rmtree(self):
181
109
        # Check to remove tree with read-only files/dirs
182
110
        os.mkdir('dir')
290
218
        self.assertFormatedDelta('1 second in the future', -1)
291
219
        self.assertFormatedDelta('2 seconds in the future', -2)
292
220
 
293
 
    def test_format_date(self):
294
 
        self.assertRaises(errors.UnsupportedTimezoneFormat,
295
 
            osutils.format_date, 0, timezone='foo')
296
 
        self.assertIsInstance(osutils.format_date(0), str)
297
 
        self.assertIsInstance(osutils.format_local_date(0), unicode)
298
 
        # Testing for the actual value of the local weekday without
299
 
        # duplicating the code from format_date is difficult.
300
 
        # Instead blackbox.test_locale should check for localized
301
 
        # dates once they do occur in output strings.
302
 
 
303
221
    def test_dereference_path(self):
304
 
        self.requireFeature(SymlinkFeature)
 
222
        if not osutils.has_symlinks():
 
223
            raise TestSkipped('Symlinks are not supported on this platform')
305
224
        cwd = osutils.realpath('.')
306
225
        os.mkdir('bar')
307
226
        bar_path = osutils.pathjoin(cwd, 'bar')
327
246
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
328
247
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
329
248
 
330
 
    def test_changing_access(self):
331
 
        f = file('file', 'w')
332
 
        f.write('monkey')
333
 
        f.close()
334
 
 
335
 
        # Make a file readonly
336
 
        osutils.make_readonly('file')
337
 
        mode = os.lstat('file').st_mode
338
 
        self.assertEqual(mode, mode & 0777555)
339
 
 
340
 
        # Make a file writable
341
 
        osutils.make_writable('file')
342
 
        mode = os.lstat('file').st_mode
343
 
        self.assertEqual(mode, mode | 0200)
344
 
 
345
 
        if osutils.has_symlinks():
346
 
            # should not error when handed a symlink
347
 
            os.symlink('nonexistent', 'dangling')
348
 
            osutils.make_readonly('dangling')
349
 
            osutils.make_writable('dangling')
350
249
 
351
250
    def test_kind_marker(self):
352
251
        self.assertEqual("", osutils.kind_marker("file"))
354
253
        self.assertEqual("@", osutils.kind_marker("symlink"))
355
254
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
356
255
 
357
 
    def test_host_os_dereferences_symlinks(self):
358
 
        osutils.host_os_dereferences_symlinks()
359
 
 
360
 
 
361
 
class TestCanonicalRelPath(TestCaseInTempDir):
362
 
 
363
 
    _test_needs_features = [CaseInsCasePresFilenameFeature]
364
 
 
365
 
    def test_canonical_relpath_simple(self):
366
 
        f = file('MixedCaseName', 'w')
367
 
        f.close()
368
 
        self.failUnlessEqual(
369
 
            canonical_relpath(self.test_base_dir, 'mixedcasename'),
370
 
            'work/MixedCaseName')
371
 
 
372
 
    def test_canonical_relpath_missing_tail(self):
373
 
        os.mkdir('MixedCaseParent')
374
 
        self.failUnlessEqual(
375
 
            canonical_relpath(self.test_base_dir, 'mixedcaseparent/nochild'),
376
 
            'work/MixedCaseParent/nochild')
377
 
 
378
 
 
379
 
class TestPumpFile(TestCase):
380
 
    """Test pumpfile method."""
381
 
    def setUp(self):
382
 
        # create a test datablock
383
 
        self.block_size = 512
384
 
        pattern = '0123456789ABCDEF'
385
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
386
 
        self.test_data_len = len(self.test_data)
387
 
 
388
 
    def test_bracket_block_size(self):
389
 
        """Read data in blocks with the requested read size bracketing the
390
 
        block size."""
391
 
        # make sure test data is larger than max read size
392
 
        self.assertTrue(self.test_data_len > self.block_size)
393
 
 
394
 
        from_file = FakeReadFile(self.test_data)
395
 
        to_file = StringIO()
396
 
 
397
 
        # read (max / 2) bytes and verify read size wasn't affected
398
 
        num_bytes_to_read = self.block_size / 2
399
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
400
 
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
401
 
        self.assertEqual(from_file.get_read_count(), 1)
402
 
 
403
 
        # read (max) bytes and verify read size wasn't affected
404
 
        num_bytes_to_read = self.block_size
405
 
        from_file.reset_read_count()
406
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
407
 
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
408
 
        self.assertEqual(from_file.get_read_count(), 1)
409
 
 
410
 
        # read (max + 1) bytes and verify read size was limited
411
 
        num_bytes_to_read = self.block_size + 1
412
 
        from_file.reset_read_count()
413
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
414
 
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
415
 
        self.assertEqual(from_file.get_read_count(), 2)
416
 
 
417
 
        # finish reading the rest of the data
418
 
        num_bytes_to_read = self.test_data_len - to_file.tell()
419
 
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
420
 
 
421
 
        # report error if the data wasn't equal (we only report the size due
422
 
        # to the length of the data)
423
 
        response_data = to_file.getvalue()
424
 
        if response_data != self.test_data:
425
 
            message = "Data not equal.  Expected %d bytes, received %d."
426
 
            self.fail(message % (len(response_data), self.test_data_len))
427
 
 
428
 
    def test_specified_size(self):
429
 
        """Request a transfer larger than the maximum block size and verify
430
 
        that the maximum read doesn't exceed the block_size."""
431
 
        # make sure test data is larger than max read size
432
 
        self.assertTrue(self.test_data_len > self.block_size)
433
 
 
434
 
        # retrieve data in blocks
435
 
        from_file = FakeReadFile(self.test_data)
436
 
        to_file = StringIO()
437
 
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
438
 
 
439
 
        # verify read size was equal to the maximum read size
440
 
        self.assertTrue(from_file.get_max_read_size() > 0)
441
 
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
442
 
        self.assertEqual(from_file.get_read_count(), 3)
443
 
 
444
 
        # report error if the data wasn't equal (we only report the size due
445
 
        # to the length of the data)
446
 
        response_data = to_file.getvalue()
447
 
        if response_data != self.test_data:
448
 
            message = "Data not equal.  Expected %d bytes, received %d."
449
 
            self.fail(message % (len(response_data), self.test_data_len))
450
 
 
451
 
    def test_to_eof(self):
452
 
        """Read to end-of-file and verify that the reads are not larger than
453
 
        the maximum read size."""
454
 
        # make sure test data is larger than max read size
455
 
        self.assertTrue(self.test_data_len > self.block_size)
456
 
 
457
 
        # retrieve data to EOF
458
 
        from_file = FakeReadFile(self.test_data)
459
 
        to_file = StringIO()
460
 
        pumpfile(from_file, to_file, -1, self.block_size)
461
 
 
462
 
        # verify read size was equal to the maximum read size
463
 
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
464
 
        self.assertEqual(from_file.get_read_count(), 4)
465
 
 
466
 
        # report error if the data wasn't equal (we only report the size due
467
 
        # to the length of the data)
468
 
        response_data = to_file.getvalue()
469
 
        if response_data != self.test_data:
470
 
            message = "Data not equal.  Expected %d bytes, received %d."
471
 
            self.fail(message % (len(response_data), self.test_data_len))
472
 
 
473
 
    def test_defaults(self):
474
 
        """Verifies that the default arguments will read to EOF -- this
475
 
        test verifies that any existing usages of pumpfile will not be broken
476
 
        with this new version."""
477
 
        # retrieve data using default (old) pumpfile method
478
 
        from_file = FakeReadFile(self.test_data)
479
 
        to_file = StringIO()
480
 
        pumpfile(from_file, to_file)
481
 
 
482
 
        # report error if the data wasn't equal (we only report the size due
483
 
        # to the length of the data)
484
 
        response_data = to_file.getvalue()
485
 
        if response_data != self.test_data:
486
 
            message = "Data not equal.  Expected %d bytes, received %d."
487
 
            self.fail(message % (len(response_data), self.test_data_len))
488
 
 
489
 
    def test_report_activity(self):
490
 
        activity = []
491
 
        def log_activity(length, direction):
492
 
            activity.append((length, direction))
493
 
        from_file = StringIO(self.test_data)
494
 
        to_file = StringIO()
495
 
        pumpfile(from_file, to_file, buff_size=500,
496
 
                 report_activity=log_activity, direction='read')
497
 
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
498
 
                          (36, 'read')], activity)
499
 
 
500
 
        from_file = StringIO(self.test_data)
501
 
        to_file = StringIO()
502
 
        del activity[:]
503
 
        pumpfile(from_file, to_file, buff_size=500,
504
 
                 report_activity=log_activity, direction='write')
505
 
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
506
 
                          (36, 'write')], activity)
507
 
 
508
 
        # And with a limited amount of data
509
 
        from_file = StringIO(self.test_data)
510
 
        to_file = StringIO()
511
 
        del activity[:]
512
 
        pumpfile(from_file, to_file, buff_size=500, read_length=1028,
513
 
                 report_activity=log_activity, direction='read')
514
 
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
515
 
 
516
 
 
517
 
 
518
 
class TestPumpStringFile(TestCase):
519
 
 
520
 
    def test_empty(self):
521
 
        output = StringIO()
522
 
        pump_string_file("", output)
523
 
        self.assertEqual("", output.getvalue())
524
 
 
525
 
    def test_more_than_segment_size(self):
526
 
        output = StringIO()
527
 
        pump_string_file("123456789", output, 2)
528
 
        self.assertEqual("123456789", output.getvalue())
529
 
 
530
 
    def test_segment_size(self):
531
 
        output = StringIO()
532
 
        pump_string_file("12", output, 2)
533
 
        self.assertEqual("12", output.getvalue())
534
 
 
535
 
    def test_segment_size_multiple(self):
536
 
        output = StringIO()
537
 
        pump_string_file("1234", output, 2)
538
 
        self.assertEqual("1234", output.getvalue())
539
 
 
540
256
 
541
257
class TestSafeUnicode(TestCase):
542
258
 
581
297
class TestSafeRevisionId(TestCase):
582
298
 
583
299
    def test_from_ascii_string(self):
584
 
        # this shouldn't give a warning because it's getting an ascii string
585
300
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
586
301
 
587
302
    def test_from_unicode_string_ascii_contents(self):
709
424
        #       osutils.getcwd() renormalize the path.
710
425
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
711
426
 
712
 
    def test_minimum_path_selection(self):
713
 
        self.assertEqual(set(),
714
 
            osutils.minimum_path_selection([]))
715
 
        self.assertEqual(set(['a', 'b']),
716
 
            osutils.minimum_path_selection(['a', 'b']))
717
 
        self.assertEqual(set(['a/', 'b']),
718
 
            osutils.minimum_path_selection(['a/', 'b']))
719
 
        self.assertEqual(set(['a/', 'b']),
720
 
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
721
 
 
722
427
    def test_mkdtemp(self):
723
428
        tmpdir = osutils._win32_mkdtemp(dir='.')
724
429
        self.assertFalse('\\' in tmpdir)
804
509
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
805
510
 
806
511
 
807
 
class TestChunksToLines(TestCase):
808
 
 
809
 
    def test_smoketest(self):
810
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
811
 
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
812
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
813
 
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
814
 
 
815
 
    def test_osutils_binding(self):
816
 
        from bzrlib.tests import test__chunks_to_lines
817
 
        if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
818
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
819
 
        else:
820
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
821
 
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
822
 
 
823
 
 
824
512
class TestSplitLines(TestCase):
825
513
 
826
514
    def test_split_unicode(self):
937
625
                new_dirblock.append((info[0], info[1], info[2], info[4]))
938
626
            dirblock[:] = new_dirblock
939
627
 
940
 
    def _save_platform_info(self):
941
 
        cur_winver = win32utils.winver
942
 
        cur_fs_enc = osutils._fs_enc
943
 
        cur_dir_reader = osutils._selected_dir_reader
944
 
        def restore():
945
 
            win32utils.winver = cur_winver
946
 
            osutils._fs_enc = cur_fs_enc
947
 
            osutils._selected_dir_reader = cur_dir_reader
948
 
        self.addCleanup(restore)
949
 
 
950
 
    def assertReadFSDirIs(self, expected):
951
 
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
952
 
        # Force it to redetect
953
 
        osutils._selected_dir_reader = None
954
 
        # Nothing to list, but should still trigger the selection logic
955
 
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
956
 
        self.assertIsInstance(osutils._selected_dir_reader, expected)
957
 
 
958
 
    def test_force_walkdirs_utf8_fs_utf8(self):
959
 
        self.requireFeature(UTF8DirReaderFeature)
960
 
        self._save_platform_info()
961
 
        win32utils.winver = None # Avoid the win32 detection code
962
 
        osutils._fs_enc = 'UTF-8'
963
 
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
964
 
 
965
 
    def test_force_walkdirs_utf8_fs_ascii(self):
966
 
        self.requireFeature(UTF8DirReaderFeature)
967
 
        self._save_platform_info()
968
 
        win32utils.winver = None # Avoid the win32 detection code
969
 
        osutils._fs_enc = 'US-ASCII'
970
 
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
971
 
 
972
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
973
 
        self.requireFeature(UTF8DirReaderFeature)
974
 
        self._save_platform_info()
975
 
        win32utils.winver = None # Avoid the win32 detection code
976
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
977
 
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
978
 
 
979
 
    def test_force_walkdirs_utf8_fs_latin1(self):
980
 
        self._save_platform_info()
981
 
        win32utils.winver = None # Avoid the win32 detection code
982
 
        osutils._fs_enc = 'latin1'
983
 
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
984
 
 
985
 
    def test_force_walkdirs_utf8_nt(self):
986
 
        # Disabled because the thunk of the whole walkdirs api is disabled.
987
 
        self.requireFeature(Win32ReadDirFeature)
988
 
        self._save_platform_info()
989
 
        win32utils.winver = 'Windows NT'
990
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
991
 
        self.assertReadFSDirIs(Win32ReadDir)
992
 
 
993
 
    def test_force_walkdirs_utf8_98(self):
994
 
        self.requireFeature(Win32ReadDirFeature)
995
 
        self._save_platform_info()
996
 
        win32utils.winver = 'Windows 98'
997
 
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
998
 
 
999
628
    def test_unicode_walkdirs(self):
1000
629
        """Walkdirs should always return unicode paths."""
1001
630
        name0 = u'0file-\xb6'
1103
732
            result.append((dirdetail, new_dirblock))
1104
733
        self.assertEqual(expected_dirblocks, result)
1105
734
 
1106
 
    def test__walkdirs_utf8_with_unicode_fs(self):
1107
 
        """UnicodeDirReader should be a safe fallback everywhere
 
735
    def test_unicode__walkdirs_unicode_to_utf8(self):
 
736
        """walkdirs_unicode_to_utf8 should be a safe fallback everywhere
1108
737
 
1109
738
        The abspath portion should be in unicode
1110
739
        """
1111
 
        # Use the unicode reader. TODO: split into driver-and-driven unit
1112
 
        # tests.
1113
 
        self._save_platform_info()
1114
 
        osutils._selected_dir_reader = osutils.UnicodeDirReader()
1115
740
        name0u = u'0file-\xb6'
1116
741
        name1u = u'1dir-\u062c\u0648'
1117
742
        name2u = u'2file-\u0633'
1152
777
                 ]
1153
778
                ),
1154
779
            ]
1155
 
        result = list(osutils._walkdirs_utf8('.'))
1156
 
        self._filter_out_stat(result)
1157
 
        self.assertEqual(expected_dirblocks, result)
1158
 
 
1159
 
    def test__walkdirs_utf8_win32readdir(self):
1160
 
        self.requireFeature(Win32ReadDirFeature)
1161
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1162
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1163
 
        self._save_platform_info()
1164
 
        osutils._selected_dir_reader = Win32ReadDir()
1165
 
        name0u = u'0file-\xb6'
1166
 
        name1u = u'1dir-\u062c\u0648'
1167
 
        name2u = u'2file-\u0633'
1168
 
        tree = [
1169
 
            name0u,
1170
 
            name1u + '/',
1171
 
            name1u + '/' + name0u,
1172
 
            name1u + '/' + name1u + '/',
1173
 
            name2u,
1174
 
            ]
1175
 
        self.build_tree(tree)
1176
 
        name0 = name0u.encode('utf8')
1177
 
        name1 = name1u.encode('utf8')
1178
 
        name2 = name2u.encode('utf8')
1179
 
 
1180
 
        # All of the abspaths should be in unicode, all of the relative paths
1181
 
        # should be in utf8
1182
 
        expected_dirblocks = [
1183
 
                (('', '.'),
1184
 
                 [(name0, name0, 'file', './' + name0u),
1185
 
                  (name1, name1, 'directory', './' + name1u),
1186
 
                  (name2, name2, 'file', './' + name2u),
1187
 
                 ]
1188
 
                ),
1189
 
                ((name1, './' + name1u),
1190
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1191
 
                                                        + '/' + name0u),
1192
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1193
 
                                                            + '/' + name1u),
1194
 
                 ]
1195
 
                ),
1196
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1197
 
                 [
1198
 
                 ]
1199
 
                ),
1200
 
            ]
1201
 
        result = list(osutils._walkdirs_utf8(u'.'))
1202
 
        self._filter_out_stat(result)
1203
 
        self.assertEqual(expected_dirblocks, result)
1204
 
 
1205
 
    def assertStatIsCorrect(self, path, win32stat):
1206
 
        os_stat = os.stat(path)
1207
 
        self.assertEqual(os_stat.st_size, win32stat.st_size)
1208
 
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1209
 
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1210
 
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1211
 
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1212
 
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1213
 
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1214
 
 
1215
 
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1216
 
        """make sure our Stat values are valid"""
1217
 
        self.requireFeature(Win32ReadDirFeature)
1218
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1219
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1220
 
        name0u = u'0file-\xb6'
1221
 
        name0 = name0u.encode('utf8')
1222
 
        self.build_tree([name0u])
1223
 
        # I hate to sleep() here, but I'm trying to make the ctime different
1224
 
        # from the mtime
1225
 
        time.sleep(2)
1226
 
        f = open(name0u, 'ab')
1227
 
        try:
1228
 
            f.write('just a small update')
1229
 
        finally:
1230
 
            f.close()
1231
 
 
1232
 
        result = Win32ReadDir().read_dir('', u'.')
1233
 
        entry = result[0]
1234
 
        self.assertEqual((name0, name0, 'file'), entry[:3])
1235
 
        self.assertEqual(u'./' + name0u, entry[4])
1236
 
        self.assertStatIsCorrect(entry[4], entry[3])
1237
 
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1238
 
 
1239
 
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1240
 
        """make sure our Stat values are valid"""
1241
 
        self.requireFeature(Win32ReadDirFeature)
1242
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1243
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1244
 
        name0u = u'0dir-\u062c\u0648'
1245
 
        name0 = name0u.encode('utf8')
1246
 
        self.build_tree([name0u + '/'])
1247
 
 
1248
 
        result = Win32ReadDir().read_dir('', u'.')
1249
 
        entry = result[0]
1250
 
        self.assertEqual((name0, name0, 'directory'), entry[:3])
1251
 
        self.assertEqual(u'./' + name0u, entry[4])
1252
 
        self.assertStatIsCorrect(entry[4], entry[3])
 
780
        result = list(osutils._walkdirs_unicode_to_utf8('.'))
 
781
        self._filter_out_stat(result)
 
782
        self.assertEqual(expected_dirblocks, result)
1253
783
 
1254
784
    def assertPathCompare(self, path_less, path_greater):
1255
785
        """check that path_less and path_greater compare correctly."""
1345
875
        self.assertEqual(['c'], os.listdir('target/b'))
1346
876
 
1347
877
    def test_copy_tree_symlinks(self):
1348
 
        self.requireFeature(SymlinkFeature)
 
878
        if not osutils.has_symlinks():
 
879
            return
1349
880
        self.build_tree(['source/'])
1350
881
        os.symlink('a/generic/path', 'source/lnk')
1351
882
        osutils.copy_tree('source', 'target')
1418
949
        
1419
950
        So Unicode strings must be encoded.
1420
951
        """
1421
 
        uni_val, env_val = probe_unicode_in_user_encoding()
1422
 
        if uni_val is None:
 
952
        # Try a few different characters, to see if we can get
 
953
        # one that will be valid in the user_encoding
 
954
        possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
955
        for uni_val in possible_vals:
 
956
            try:
 
957
                env_val = uni_val.encode(bzrlib.user_encoding)
 
958
            except UnicodeEncodeError:
 
959
                # Try a different character
 
960
                pass
 
961
            else:
 
962
                break
 
963
        else:
1423
964
            raise TestSkipped('Cannot find a unicode character that works in'
1424
 
                              ' encoding %s' % (osutils.get_user_encoding(),))
 
965
                              ' encoding %s' % (bzrlib.user_encoding,))
1425
966
 
1426
967
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1427
968
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1455
996
        self.assertTrue(isinstance(offset, int))
1456
997
        eighteen_hours = 18 * 3600
1457
998
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1458
 
 
1459
 
 
1460
 
class TestShaFileByName(TestCaseInTempDir):
1461
 
 
1462
 
    def test_sha_empty(self):
1463
 
        self.build_tree_contents([('foo', '')])
1464
 
        expected_sha = osutils.sha_string('')
1465
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1466
 
 
1467
 
    def test_sha_mixed_endings(self):
1468
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1469
 
        self.build_tree_contents([('foo', text)])
1470
 
        expected_sha = osutils.sha_string(text)
1471
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1472
 
 
1473
 
 
1474
 
class TestResourceLoading(TestCaseInTempDir):
1475
 
 
1476
 
    def test_resource_string(self):
1477
 
        # test resource in bzrlib
1478
 
        text = osutils.resource_string('bzrlib', 'debug.py')
1479
 
        self.assertContainsRe(text, "debug_flags = set()")
1480
 
        # test resource under bzrlib
1481
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
1482
 
        self.assertContainsRe(text, "class TextUIFactory")
1483
 
        # test unsupported package
1484
 
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1485
 
            'yyy.xx')
1486
 
        # test unknown resource
1487
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')