~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Robert Collins
  • Date: 2007-04-19 02:27:44 UTC
  • mto: This revision was merged to the branch mainline in revision 2426.
  • Revision ID: robertc@robertcollins.net-20070419022744-pfdqz42kp1wizh43
``make docs`` now creates a man page at ``man1/bzr.1`` fixing bug 107388.
(Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
 
from cStringIO import StringIO
20
19
import errno
21
20
import os
22
 
import re
23
21
import socket
 
22
import stat
24
23
import sys
25
 
import time
26
24
 
 
25
import bzrlib
27
26
from bzrlib import (
28
27
    errors,
29
 
    lazy_regex,
30
28
    osutils,
31
 
    symbol_versioning,
32
 
    tests,
33
 
    trace,
34
29
    win32utils,
35
30
    )
 
31
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
36
32
from bzrlib.tests import (
37
 
    features,
38
 
    file_utils,
39
 
    test__walkdirs_win32,
40
 
    )
41
 
from bzrlib.tests.scenarios import load_tests_apply_scenarios
42
 
 
43
 
 
44
 
class _UTF8DirReaderFeature(tests.Feature):
45
 
 
46
 
    def _probe(self):
47
 
        try:
48
 
            from bzrlib import _readdir_pyx
49
 
            self.reader = _readdir_pyx.UTF8DirReader
50
 
            return True
51
 
        except ImportError:
52
 
            return False
53
 
 
54
 
    def feature_name(self):
55
 
        return 'bzrlib._readdir_pyx'
56
 
 
57
 
UTF8DirReaderFeature = _UTF8DirReaderFeature()
58
 
 
59
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
60
 
 
61
 
 
62
 
def _already_unicode(s):
63
 
    return s
64
 
 
65
 
 
66
 
def _utf8_to_unicode(s):
67
 
    return s.decode('UTF-8')
68
 
 
69
 
 
70
 
def dir_reader_scenarios():
71
 
    # For each dir reader we define:
72
 
 
73
 
    # - native_to_unicode: a function converting the native_abspath as returned
74
 
    #   by DirReader.read_dir to its unicode representation
75
 
 
76
 
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
77
 
    scenarios = [('unicode',
78
 
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
79
 
                       _native_to_unicode=_already_unicode))]
80
 
    # Some DirReaders are platform specific and even there they may not be
81
 
    # available.
82
 
    if UTF8DirReaderFeature.available():
83
 
        from bzrlib import _readdir_pyx
84
 
        scenarios.append(('utf8',
85
 
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
86
 
                               _native_to_unicode=_utf8_to_unicode)))
87
 
 
88
 
    if test__walkdirs_win32.win32_readdir_feature.available():
89
 
        try:
90
 
            from bzrlib import _walkdirs_win32
91
 
            scenarios.append(
92
 
                ('win32',
93
 
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
94
 
                      _native_to_unicode=_already_unicode)))
95
 
        except ImportError:
96
 
            pass
97
 
    return scenarios
98
 
 
99
 
 
100
 
load_tests = load_tests_apply_scenarios
101
 
 
102
 
 
103
 
class TestContainsWhitespace(tests.TestCase):
 
33
        StringIOWrapper,
 
34
        TestCase, 
 
35
        TestCaseInTempDir, 
 
36
        TestSkipped,
 
37
        )
 
38
 
 
39
 
 
40
class TestOSUtils(TestCaseInTempDir):
104
41
 
105
42
    def test_contains_whitespace(self):
106
43
        self.failUnless(osutils.contains_whitespace(u' '))
116
53
        self.failIf(osutils.contains_whitespace(u'hellothere'))
117
54
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
118
55
 
119
 
 
120
 
class TestRename(tests.TestCaseInTempDir):
121
 
 
122
 
    def create_file(self, filename, content):
123
 
        f = open(filename, 'wb')
124
 
        try:
125
 
            f.write(content)
126
 
        finally:
127
 
            f.close()
128
 
 
129
 
    def _fancy_rename(self, a, b):
130
 
        osutils.fancy_rename(a, b, rename_func=os.rename,
131
 
                             unlink_func=os.unlink)
132
 
 
133
56
    def test_fancy_rename(self):
134
57
        # This should work everywhere
135
 
        self.create_file('a', 'something in a\n')
136
 
        self._fancy_rename('a', 'b')
 
58
        def rename(a, b):
 
59
            osutils.fancy_rename(a, b,
 
60
                    rename_func=os.rename,
 
61
                    unlink_func=os.unlink)
 
62
 
 
63
        open('a', 'wb').write('something in a\n')
 
64
        rename('a', 'b')
137
65
        self.failIfExists('a')
138
66
        self.failUnlessExists('b')
139
67
        self.check_file_contents('b', 'something in a\n')
140
68
 
141
 
        self.create_file('a', 'new something in a\n')
142
 
        self._fancy_rename('b', 'a')
 
69
        open('a', 'wb').write('new something in a\n')
 
70
        rename('b', 'a')
143
71
 
144
72
        self.check_file_contents('a', 'something in a\n')
145
73
 
146
 
    def test_fancy_rename_fails_source_missing(self):
147
 
        # An exception should be raised, and the target should be left in place
148
 
        self.create_file('target', 'data in target\n')
149
 
        self.assertRaises((IOError, OSError), self._fancy_rename,
150
 
                          'missingsource', 'target')
151
 
        self.failUnlessExists('target')
152
 
        self.check_file_contents('target', 'data in target\n')
153
 
 
154
 
    def test_fancy_rename_fails_if_source_and_target_missing(self):
155
 
        self.assertRaises((IOError, OSError), self._fancy_rename,
156
 
                          'missingsource', 'missingtarget')
157
 
 
158
74
    def test_rename(self):
159
75
        # Rename should be semi-atomic on all platforms
160
 
        self.create_file('a', 'something in a\n')
 
76
        open('a', 'wb').write('something in a\n')
161
77
        osutils.rename('a', 'b')
162
78
        self.failIfExists('a')
163
79
        self.failUnlessExists('b')
164
80
        self.check_file_contents('b', 'something in a\n')
165
81
 
166
 
        self.create_file('a', 'new something in a\n')
 
82
        open('a', 'wb').write('new something in a\n')
167
83
        osutils.rename('b', 'a')
168
84
 
169
85
        self.check_file_contents('a', 'something in a\n')
170
86
 
171
87
    # TODO: test fancy_rename using a MemoryTransport
172
88
 
173
 
    def test_rename_change_case(self):
174
 
        # on Windows we should be able to change filename case by rename
175
 
        self.build_tree(['a', 'b/'])
176
 
        osutils.rename('a', 'A')
177
 
        osutils.rename('b', 'B')
178
 
        # we can't use failUnlessExists on case-insensitive filesystem
179
 
        # so try to check shape of the tree
180
 
        shape = sorted(os.listdir('.'))
181
 
        self.assertEquals(['A', 'B'], shape)
182
 
 
183
 
 
184
 
class TestRandChars(tests.TestCase):
185
 
 
186
89
    def test_01_rand_chars_empty(self):
187
90
        result = osutils.rand_chars(0)
188
91
        self.assertEqual(result, '')
193
96
        self.assertEqual(type(result), str)
194
97
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
195
98
 
196
 
 
197
 
class TestIsInside(tests.TestCase):
198
 
 
199
99
    def test_is_inside(self):
200
100
        is_inside = osutils.is_inside
201
101
        self.assertTrue(is_inside('src', 'src/foo.c'))
205
105
        self.assertFalse(is_inside('foo.c', ''))
206
106
        self.assertTrue(is_inside('', 'foo.c'))
207
107
 
208
 
    def test_is_inside_any(self):
209
 
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
210
 
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
211
 
                         (['src'], SRC_FOO_C),
212
 
                         (['src'], 'src'),
213
 
                         ]:
214
 
            self.assert_(osutils.is_inside_any(dirs, fn))
215
 
        for dirs, fn in [(['src'], 'srccontrol'),
216
 
                         (['src'], 'srccontrol/foo')]:
217
 
            self.assertFalse(osutils.is_inside_any(dirs, fn))
218
 
 
219
 
    def test_is_inside_or_parent_of_any(self):
220
 
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
221
 
                         (['src'], 'src/foo.c'),
222
 
                         (['src/bar.c'], 'src'),
223
 
                         (['src/bar.c', 'bla/foo.c'], 'src'),
224
 
                         (['src'], 'src'),
225
 
                         ]:
226
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
227
 
 
228
 
        for dirs, fn in [(['src'], 'srccontrol'),
229
 
                         (['srccontrol/foo.c'], 'src'),
230
 
                         (['src'], 'srccontrol/foo')]:
231
 
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
232
 
 
233
 
 
234
 
class TestRmTree(tests.TestCaseInTempDir):
235
 
 
236
108
    def test_rmtree(self):
237
109
        # Check to remove tree with read-only files/dirs
238
110
        os.mkdir('dir')
251
123
        self.failIfExists('dir/file')
252
124
        self.failIfExists('dir')
253
125
 
254
 
 
255
 
class TestDeleteAny(tests.TestCaseInTempDir):
256
 
 
257
 
    def test_delete_any_readonly(self):
258
 
        # from <https://bugs.launchpad.net/bzr/+bug/218206>
259
 
        self.build_tree(['d/', 'f'])
260
 
        osutils.make_readonly('d')
261
 
        osutils.make_readonly('f')
262
 
 
263
 
        osutils.delete_any('f')
264
 
        osutils.delete_any('d')
265
 
 
266
 
 
267
 
class TestKind(tests.TestCaseInTempDir):
268
 
 
269
126
    def test_file_kind(self):
270
127
        self.build_tree(['file', 'dir/'])
271
128
        self.assertEquals('file', osutils.file_kind('file'))
273
130
        if osutils.has_symlinks():
274
131
            os.symlink('symlink', 'symlink')
275
132
            self.assertEquals('symlink', osutils.file_kind('symlink'))
276
 
 
 
133
        
277
134
        # TODO: jam 20060529 Test a block device
278
135
        try:
279
136
            os.lstat('/dev/null')
301
158
                os.remove('socket')
302
159
 
303
160
    def test_kind_marker(self):
304
 
        self.assertEqual("", osutils.kind_marker("file"))
305
 
        self.assertEqual("/", osutils.kind_marker('directory'))
306
 
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
307
 
        self.assertEqual("@", osutils.kind_marker("symlink"))
308
 
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
309
 
        self.assertEqual("", osutils.kind_marker("fifo"))
310
 
        self.assertEqual("", osutils.kind_marker("socket"))
311
 
        self.assertEqual("", osutils.kind_marker("unknown"))
312
 
 
313
 
 
314
 
class TestUmask(tests.TestCaseInTempDir):
 
161
        self.assertEqual(osutils.kind_marker('file'), '')
 
162
        self.assertEqual(osutils.kind_marker('directory'), '/')
 
163
        self.assertEqual(osutils.kind_marker('symlink'), '@')
 
164
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
315
165
 
316
166
    def test_get_umask(self):
317
167
        if sys.platform == 'win32':
320
170
            return
321
171
 
322
172
        orig_umask = osutils.get_umask()
323
 
        self.addCleanup(os.umask, orig_umask)
324
 
        os.umask(0222)
325
 
        self.assertEqual(0222, osutils.get_umask())
326
 
        os.umask(0022)
327
 
        self.assertEqual(0022, osutils.get_umask())
328
 
        os.umask(0002)
329
 
        self.assertEqual(0002, osutils.get_umask())
330
 
        os.umask(0027)
331
 
        self.assertEqual(0027, osutils.get_umask())
332
 
 
333
 
 
334
 
class TestDateTime(tests.TestCase):
 
173
        try:
 
174
            os.umask(0222)
 
175
            self.assertEqual(0222, osutils.get_umask())
 
176
            os.umask(0022)
 
177
            self.assertEqual(0022, osutils.get_umask())
 
178
            os.umask(0002)
 
179
            self.assertEqual(0002, osutils.get_umask())
 
180
            os.umask(0027)
 
181
            self.assertEqual(0027, osutils.get_umask())
 
182
        finally:
 
183
            os.umask(orig_umask)
335
184
 
336
185
    def assertFormatedDelta(self, expected, seconds):
337
186
        """Assert osutils.format_delta formats as expected"""
369
218
        self.assertFormatedDelta('1 second in the future', -1)
370
219
        self.assertFormatedDelta('2 seconds in the future', -2)
371
220
 
372
 
    def test_format_date(self):
373
 
        self.assertRaises(errors.UnsupportedTimezoneFormat,
374
 
            osutils.format_date, 0, timezone='foo')
375
 
        self.assertIsInstance(osutils.format_date(0), str)
376
 
        self.assertIsInstance(osutils.format_local_date(0), unicode)
377
 
        # Testing for the actual value of the local weekday without
378
 
        # duplicating the code from format_date is difficult.
379
 
        # Instead blackbox.test_locale should check for localized
380
 
        # dates once they do occur in output strings.
381
 
 
382
 
    def test_format_date_with_offset_in_original_timezone(self):
383
 
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
384
 
            osutils.format_date_with_offset_in_original_timezone(0))
385
 
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
386
 
            osutils.format_date_with_offset_in_original_timezone(100000))
387
 
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
388
 
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
389
 
 
390
 
    def test_local_time_offset(self):
391
 
        """Test that local_time_offset() returns a sane value."""
392
 
        offset = osutils.local_time_offset()
393
 
        self.assertTrue(isinstance(offset, int))
394
 
        # Test that the offset is no more than a eighteen hours in
395
 
        # either direction.
396
 
        # Time zone handling is system specific, so it is difficult to
397
 
        # do more specific tests, but a value outside of this range is
398
 
        # probably wrong.
399
 
        eighteen_hours = 18 * 3600
400
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
401
 
 
402
 
    def test_local_time_offset_with_timestamp(self):
403
 
        """Test that local_time_offset() works with a timestamp."""
404
 
        offset = osutils.local_time_offset(1000000000.1234567)
405
 
        self.assertTrue(isinstance(offset, int))
406
 
        eighteen_hours = 18 * 3600
407
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
408
 
 
409
 
 
410
 
class TestLinks(tests.TestCaseInTempDir):
411
 
 
412
221
    def test_dereference_path(self):
413
 
        self.requireFeature(tests.SymlinkFeature)
 
222
        if not osutils.has_symlinks():
 
223
            raise TestSkipped('Symlinks are not supported on this platform')
414
224
        cwd = osutils.realpath('.')
415
225
        os.mkdir('bar')
416
226
        bar_path = osutils.pathjoin(cwd, 'bar')
419
229
        self.assertEqual(bar_path, osutils.realpath('./bar'))
420
230
        os.symlink('bar', 'foo')
421
231
        self.assertEqual(bar_path, osutils.realpath('./foo'))
422
 
 
 
232
        
423
233
        # Does not dereference terminal symlinks
424
234
        foo_path = osutils.pathjoin(cwd, 'foo')
425
235
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
436
246
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
437
247
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
438
248
 
439
 
    def test_changing_access(self):
440
 
        f = file('file', 'w')
441
 
        f.write('monkey')
442
 
        f.close()
443
 
 
444
 
        # Make a file readonly
445
 
        osutils.make_readonly('file')
446
 
        mode = os.lstat('file').st_mode
447
 
        self.assertEqual(mode, mode & 0777555)
448
 
 
449
 
        # Make a file writable
450
 
        osutils.make_writable('file')
451
 
        mode = os.lstat('file').st_mode
452
 
        self.assertEqual(mode, mode | 0200)
453
 
 
454
 
        if osutils.has_symlinks():
455
 
            # should not error when handed a symlink
456
 
            os.symlink('nonexistent', 'dangling')
457
 
            osutils.make_readonly('dangling')
458
 
            osutils.make_writable('dangling')
459
 
 
460
 
    def test_host_os_dereferences_symlinks(self):
461
 
        osutils.host_os_dereferences_symlinks()
462
 
 
463
 
 
464
 
class TestCanonicalRelPath(tests.TestCaseInTempDir):
465
 
 
466
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
467
 
 
468
 
    def test_canonical_relpath_simple(self):
469
 
        f = file('MixedCaseName', 'w')
470
 
        f.close()
471
 
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
472
 
        self.failUnlessEqual('work/MixedCaseName', actual)
473
 
 
474
 
    def test_canonical_relpath_missing_tail(self):
475
 
        os.mkdir('MixedCaseParent')
476
 
        actual = osutils.canonical_relpath(self.test_base_dir,
477
 
                                           'mixedcaseparent/nochild')
478
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
479
 
 
480
 
 
481
 
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
482
 
 
483
 
    def assertRelpath(self, expected, base, path):
484
 
        actual = osutils._cicp_canonical_relpath(base, path)
485
 
        self.assertEqual(expected, actual)
486
 
 
487
 
    def test_simple(self):
488
 
        self.build_tree(['MixedCaseName'])
489
 
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
490
 
        self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
491
 
 
492
 
    def test_subdir_missing_tail(self):
493
 
        self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
494
 
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
495
 
        self.assertRelpath('MixedCaseParent/a_child', base,
496
 
                           'MixedCaseParent/a_child')
497
 
        self.assertRelpath('MixedCaseParent/a_child', base,
498
 
                           'MixedCaseParent/A_Child')
499
 
        self.assertRelpath('MixedCaseParent/not_child', base,
500
 
                           'MixedCaseParent/not_child')
501
 
 
502
 
    def test_at_root_slash(self):
503
 
        # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
504
 
        # check...
505
 
        if osutils.MIN_ABS_PATHLENGTH > 1:
506
 
            raise tests.TestSkipped('relpath requires %d chars'
507
 
                                    % osutils.MIN_ABS_PATHLENGTH)
508
 
        self.assertRelpath('foo', '/', '/foo')
509
 
 
510
 
    def test_at_root_drive(self):
511
 
        if sys.platform != 'win32':
512
 
            raise tests.TestNotApplicable('we can only test drive-letter relative'
513
 
                                          ' paths on Windows where we have drive'
514
 
                                          ' letters.')
515
 
        # see bug #322807
516
 
        # The specific issue is that when at the root of a drive, 'abspath'
517
 
        # returns "C:/" or just "/". However, the code assumes that abspath
518
 
        # always returns something like "C:/foo" or "/foo" (no trailing slash).
519
 
        self.assertRelpath('foo', 'C:/', 'C:/foo')
520
 
        self.assertRelpath('foo', 'X:/', 'X:/foo')
521
 
        self.assertRelpath('foo', 'X:/', 'X://foo')
522
 
 
523
 
 
524
 
class TestPumpFile(tests.TestCase):
525
 
    """Test pumpfile method."""
526
 
 
527
 
    def setUp(self):
528
 
        tests.TestCase.setUp(self)
529
 
        # create a test datablock
530
 
        self.block_size = 512
531
 
        pattern = '0123456789ABCDEF'
532
 
        self.test_data = pattern * (3 * self.block_size / len(pattern))
533
 
        self.test_data_len = len(self.test_data)
534
 
 
535
 
    def test_bracket_block_size(self):
536
 
        """Read data in blocks with the requested read size bracketing the
537
 
        block size."""
538
 
        # make sure test data is larger than max read size
539
 
        self.assertTrue(self.test_data_len > self.block_size)
540
 
 
541
 
        from_file = file_utils.FakeReadFile(self.test_data)
542
 
        to_file = StringIO()
543
 
 
544
 
        # read (max / 2) bytes and verify read size wasn't affected
545
 
        num_bytes_to_read = self.block_size / 2
546
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
547
 
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
548
 
        self.assertEqual(from_file.get_read_count(), 1)
549
 
 
550
 
        # read (max) bytes and verify read size wasn't affected
551
 
        num_bytes_to_read = self.block_size
552
 
        from_file.reset_read_count()
553
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
554
 
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
555
 
        self.assertEqual(from_file.get_read_count(), 1)
556
 
 
557
 
        # read (max + 1) bytes and verify read size was limited
558
 
        num_bytes_to_read = self.block_size + 1
559
 
        from_file.reset_read_count()
560
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
561
 
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
562
 
        self.assertEqual(from_file.get_read_count(), 2)
563
 
 
564
 
        # finish reading the rest of the data
565
 
        num_bytes_to_read = self.test_data_len - to_file.tell()
566
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
567
 
 
568
 
        # report error if the data wasn't equal (we only report the size due
569
 
        # to the length of the data)
570
 
        response_data = to_file.getvalue()
571
 
        if response_data != self.test_data:
572
 
            message = "Data not equal.  Expected %d bytes, received %d."
573
 
            self.fail(message % (len(response_data), self.test_data_len))
574
 
 
575
 
    def test_specified_size(self):
576
 
        """Request a transfer larger than the maximum block size and verify
577
 
        that the maximum read doesn't exceed the block_size."""
578
 
        # make sure test data is larger than max read size
579
 
        self.assertTrue(self.test_data_len > self.block_size)
580
 
 
581
 
        # retrieve data in blocks
582
 
        from_file = file_utils.FakeReadFile(self.test_data)
583
 
        to_file = StringIO()
584
 
        osutils.pumpfile(from_file, to_file, self.test_data_len,
585
 
                         self.block_size)
586
 
 
587
 
        # verify read size was equal to the maximum read size
588
 
        self.assertTrue(from_file.get_max_read_size() > 0)
589
 
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
590
 
        self.assertEqual(from_file.get_read_count(), 3)
591
 
 
592
 
        # report error if the data wasn't equal (we only report the size due
593
 
        # to the length of the data)
594
 
        response_data = to_file.getvalue()
595
 
        if response_data != self.test_data:
596
 
            message = "Data not equal.  Expected %d bytes, received %d."
597
 
            self.fail(message % (len(response_data), self.test_data_len))
598
 
 
599
 
    def test_to_eof(self):
600
 
        """Read to end-of-file and verify that the reads are not larger than
601
 
        the maximum read size."""
602
 
        # make sure test data is larger than max read size
603
 
        self.assertTrue(self.test_data_len > self.block_size)
604
 
 
605
 
        # retrieve data to EOF
606
 
        from_file = file_utils.FakeReadFile(self.test_data)
607
 
        to_file = StringIO()
608
 
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
609
 
 
610
 
        # verify read size was equal to the maximum read size
611
 
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
612
 
        self.assertEqual(from_file.get_read_count(), 4)
613
 
 
614
 
        # report error if the data wasn't equal (we only report the size due
615
 
        # to the length of the data)
616
 
        response_data = to_file.getvalue()
617
 
        if response_data != self.test_data:
618
 
            message = "Data not equal.  Expected %d bytes, received %d."
619
 
            self.fail(message % (len(response_data), self.test_data_len))
620
 
 
621
 
    def test_defaults(self):
622
 
        """Verifies that the default arguments will read to EOF -- this
623
 
        test verifies that any existing usages of pumpfile will not be broken
624
 
        with this new version."""
625
 
        # retrieve data using default (old) pumpfile method
626
 
        from_file = file_utils.FakeReadFile(self.test_data)
627
 
        to_file = StringIO()
628
 
        osutils.pumpfile(from_file, to_file)
629
 
 
630
 
        # report error if the data wasn't equal (we only report the size due
631
 
        # to the length of the data)
632
 
        response_data = to_file.getvalue()
633
 
        if response_data != self.test_data:
634
 
            message = "Data not equal.  Expected %d bytes, received %d."
635
 
            self.fail(message % (len(response_data), self.test_data_len))
636
 
 
637
 
    def test_report_activity(self):
638
 
        activity = []
639
 
        def log_activity(length, direction):
640
 
            activity.append((length, direction))
641
 
        from_file = StringIO(self.test_data)
642
 
        to_file = StringIO()
643
 
        osutils.pumpfile(from_file, to_file, buff_size=500,
644
 
                         report_activity=log_activity, direction='read')
645
 
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
646
 
                          (36, 'read')], activity)
647
 
 
648
 
        from_file = StringIO(self.test_data)
649
 
        to_file = StringIO()
650
 
        del activity[:]
651
 
        osutils.pumpfile(from_file, to_file, buff_size=500,
652
 
                         report_activity=log_activity, direction='write')
653
 
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
654
 
                          (36, 'write')], activity)
655
 
 
656
 
        # And with a limited amount of data
657
 
        from_file = StringIO(self.test_data)
658
 
        to_file = StringIO()
659
 
        del activity[:]
660
 
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
661
 
                         report_activity=log_activity, direction='read')
662
 
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
663
 
 
664
 
 
665
 
 
666
 
class TestPumpStringFile(tests.TestCase):
667
 
 
668
 
    def test_empty(self):
669
 
        output = StringIO()
670
 
        osutils.pump_string_file("", output)
671
 
        self.assertEqual("", output.getvalue())
672
 
 
673
 
    def test_more_than_segment_size(self):
674
 
        output = StringIO()
675
 
        osutils.pump_string_file("123456789", output, 2)
676
 
        self.assertEqual("123456789", output.getvalue())
677
 
 
678
 
    def test_segment_size(self):
679
 
        output = StringIO()
680
 
        osutils.pump_string_file("12", output, 2)
681
 
        self.assertEqual("12", output.getvalue())
682
 
 
683
 
    def test_segment_size_multiple(self):
684
 
        output = StringIO()
685
 
        osutils.pump_string_file("1234", output, 2)
686
 
        self.assertEqual("1234", output.getvalue())
687
 
 
688
 
 
689
 
class TestRelpath(tests.TestCase):
690
 
 
691
 
    def test_simple_relpath(self):
692
 
        cwd = osutils.getcwd()
693
 
        subdir = cwd + '/subdir'
694
 
        self.assertEqual('subdir', osutils.relpath(cwd, subdir))
695
 
 
696
 
    def test_deep_relpath(self):
697
 
        cwd = osutils.getcwd()
698
 
        subdir = cwd + '/sub/subsubdir'
699
 
        self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
700
 
 
701
 
    def test_not_relative(self):
702
 
        self.assertRaises(errors.PathNotChild,
703
 
                          osutils.relpath, 'C:/path', 'H:/path')
704
 
        self.assertRaises(errors.PathNotChild,
705
 
                          osutils.relpath, 'C:/', 'H:/path')
706
 
 
707
 
 
708
 
class TestSafeUnicode(tests.TestCase):
 
249
 
 
250
    def test_kind_marker(self):
 
251
        self.assertEqual("", osutils.kind_marker("file"))
 
252
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
253
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
254
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
255
 
 
256
 
 
257
class TestSafeUnicode(TestCase):
709
258
 
710
259
    def test_from_ascii_string(self):
711
260
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
720
269
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
721
270
 
722
271
    def test_bad_utf8_string(self):
723
 
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
272
        self.assertRaises(BzrBadParameterNotUnicode,
724
273
                          osutils.safe_unicode,
725
274
                          '\xbb\xbb')
726
275
 
727
276
 
728
 
class TestSafeUtf8(tests.TestCase):
 
277
class TestSafeUtf8(TestCase):
729
278
 
730
279
    def test_from_ascii_string(self):
731
280
        f = 'foobar'
741
290
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
742
291
 
743
292
    def test_bad_utf8_string(self):
744
 
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
293
        self.assertRaises(BzrBadParameterNotUnicode,
745
294
                          osutils.safe_utf8, '\xbb\xbb')
746
295
 
747
296
 
748
 
class TestSafeRevisionId(tests.TestCase):
 
297
class TestSafeRevisionId(TestCase):
749
298
 
750
299
    def test_from_ascii_string(self):
751
 
        # this shouldn't give a warning because it's getting an ascii string
752
300
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
753
301
 
754
302
    def test_from_unicode_string_ascii_contents(self):
773
321
        self.assertEqual(None, osutils.safe_revision_id(None))
774
322
 
775
323
 
776
 
class TestSafeFileId(tests.TestCase):
 
324
class TestSafeFileId(TestCase):
777
325
 
778
326
    def test_from_ascii_string(self):
779
327
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
799
347
        self.assertEqual(None, osutils.safe_file_id(None))
800
348
 
801
349
 
802
 
class TestWin32Funcs(tests.TestCase):
803
 
    """Test that _win32 versions of os utilities return appropriate paths."""
 
350
class TestWin32Funcs(TestCase):
 
351
    """Test that the _win32 versions of os utilities return appropriate paths."""
804
352
 
805
353
    def test_abspath(self):
806
354
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
813
361
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
814
362
 
815
363
    def test_pathjoin(self):
816
 
        self.assertEqual('path/to/foo',
817
 
                         osutils._win32_pathjoin('path', 'to', 'foo'))
818
 
        self.assertEqual('C:/foo',
819
 
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
820
 
        self.assertEqual('C:/foo',
821
 
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
822
 
        self.assertEqual('path/to/foo',
823
 
                         osutils._win32_pathjoin('path/to/', 'foo'))
824
 
        self.assertEqual('/foo',
825
 
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
826
 
        self.assertEqual('/foo',
827
 
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
364
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
365
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
366
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
367
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
368
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
369
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
828
370
 
829
371
    def test_normpath(self):
830
 
        self.assertEqual('path/to/foo',
831
 
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
832
 
        self.assertEqual('path/to/foo',
833
 
                         osutils._win32_normpath('path//from/../to/./foo'))
 
372
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
373
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
834
374
 
835
375
    def test_getcwd(self):
836
376
        cwd = osutils._win32_getcwd()
857
397
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
858
398
        # relative path
859
399
        cwd = osutils.getcwd().rstrip('/')
860
 
        drive = osutils.ntpath.splitdrive(cwd)[0]
 
400
        drive = osutils._nt_splitdrive(cwd)[0]
861
401
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
862
402
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
863
403
        # unicode path
865
405
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
866
406
 
867
407
 
868
 
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
 
408
class TestWin32FuncsDirs(TestCaseInTempDir):
869
409
    """Test win32 functions that create files."""
 
410
    
 
411
    def test_getcwd(self):
 
412
        if win32utils.winver == 'Windows 98':
 
413
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
 
414
        # Make sure getcwd can handle unicode filenames
 
415
        try:
 
416
            os.mkdir(u'mu-\xb5')
 
417
        except UnicodeError:
 
418
            raise TestSkipped("Unable to create Unicode filename")
870
419
 
871
 
    def test_getcwd(self):
872
 
        self.requireFeature(tests.UnicodeFilenameFeature)
873
 
        os.mkdir(u'mu-\xb5')
874
420
        os.chdir(u'mu-\xb5')
875
421
        # TODO: jam 20060427 This will probably fail on Mac OSX because
876
422
        #       it will change the normalization of B\xe5gfors
878
424
        #       osutils.getcwd() renormalize the path.
879
425
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
880
426
 
881
 
    def test_minimum_path_selection(self):
882
 
        self.assertEqual(set(),
883
 
            osutils.minimum_path_selection([]))
884
 
        self.assertEqual(set(['a']),
885
 
            osutils.minimum_path_selection(['a']))
886
 
        self.assertEqual(set(['a', 'b']),
887
 
            osutils.minimum_path_selection(['a', 'b']))
888
 
        self.assertEqual(set(['a/', 'b']),
889
 
            osutils.minimum_path_selection(['a/', 'b']))
890
 
        self.assertEqual(set(['a/', 'b']),
891
 
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
892
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
893
 
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
894
 
 
895
427
    def test_mkdtemp(self):
896
428
        tmpdir = osutils._win32_mkdtemp(dir='.')
897
429
        self.assertFalse('\\' in tmpdir)
952
484
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
953
485
 
954
486
 
955
 
class TestParentDirectories(tests.TestCaseInTempDir):
956
 
    """Test osutils.parent_directories()"""
957
 
 
958
 
    def test_parent_directories(self):
959
 
        self.assertEqual([], osutils.parent_directories('a'))
960
 
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
961
 
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
962
 
 
963
 
 
964
 
class TestMacFuncsDirs(tests.TestCaseInTempDir):
 
487
class TestMacFuncsDirs(TestCaseInTempDir):
965
488
    """Test mac special functions that require directories."""
966
489
 
967
490
    def test_getcwd(self):
968
 
        self.requireFeature(tests.UnicodeFilenameFeature)
969
 
        os.mkdir(u'B\xe5gfors')
 
491
        # On Mac, this will actually create Ba\u030agfors
 
492
        # but chdir will still work, because it accepts both paths
 
493
        try:
 
494
            os.mkdir(u'B\xe5gfors')
 
495
        except UnicodeError:
 
496
            raise TestSkipped("Unable to create Unicode filename")
 
497
 
970
498
        os.chdir(u'B\xe5gfors')
971
499
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
972
500
 
973
501
    def test_getcwd_nonnorm(self):
974
 
        self.requireFeature(tests.UnicodeFilenameFeature)
975
502
        # Test that _mac_getcwd() will normalize this path
976
 
        os.mkdir(u'Ba\u030agfors')
 
503
        try:
 
504
            os.mkdir(u'Ba\u030agfors')
 
505
        except UnicodeError:
 
506
            raise TestSkipped("Unable to create Unicode filename")
 
507
 
977
508
        os.chdir(u'Ba\u030agfors')
978
509
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
979
510
 
980
511
 
981
 
class TestChunksToLines(tests.TestCase):
982
 
 
983
 
    def test_smoketest(self):
984
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
985
 
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
986
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
987
 
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
988
 
 
989
 
    def test_osutils_binding(self):
990
 
        from bzrlib.tests import test__chunks_to_lines
991
 
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
992
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
993
 
        else:
994
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
995
 
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
996
 
 
997
 
 
998
 
class TestSplitLines(tests.TestCase):
 
512
class TestSplitLines(TestCase):
999
513
 
1000
514
    def test_split_unicode(self):
1001
515
        self.assertEqual([u'foo\n', u'bar\xae'],
1008
522
                         osutils.split_lines('foo\rbar\n'))
1009
523
 
1010
524
 
1011
 
class TestWalkDirs(tests.TestCaseInTempDir):
1012
 
 
1013
 
    def assertExpectedBlocks(self, expected, result):
1014
 
        self.assertEqual(expected,
1015
 
                         [(dirinfo, [line[0:3] for line in block])
1016
 
                          for dirinfo, block in result])
 
525
class TestWalkDirs(TestCaseInTempDir):
1017
526
 
1018
527
    def test_walkdirs(self):
1019
528
        tree = [
1052
561
            result.append((dirdetail, dirblock))
1053
562
 
1054
563
        self.assertTrue(found_bzrdir)
1055
 
        self.assertExpectedBlocks(expected_dirblocks, result)
 
564
        self.assertEqual(expected_dirblocks,
 
565
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1056
566
        # you can search a subdir only, with a supplied prefix.
1057
567
        result = []
1058
568
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
1059
569
            result.append(dirblock)
1060
 
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1061
 
 
1062
 
    def test_walkdirs_os_error(self):
1063
 
        # <https://bugs.launchpad.net/bzr/+bug/338653>
1064
 
        # Pyrex readdir didn't raise useful messages if it had an error
1065
 
        # reading the directory
1066
 
        if sys.platform == 'win32':
1067
 
            raise tests.TestNotApplicable(
1068
 
                "readdir IOError not tested on win32")
1069
 
        self.requireFeature(features.not_running_as_root)
1070
 
        os.mkdir("test-unreadable")
1071
 
        os.chmod("test-unreadable", 0000)
1072
 
        # must chmod it back so that it can be removed
1073
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
1074
 
        # The error is not raised until the generator is actually evaluated.
1075
 
        # (It would be ok if it happened earlier but at the moment it
1076
 
        # doesn't.)
1077
 
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1078
 
        self.assertEquals('./test-unreadable', e.filename)
1079
 
        self.assertEquals(errno.EACCES, e.errno)
1080
 
        # Ensure the message contains the file name
1081
 
        self.assertContainsRe(str(e), "\./test-unreadable")
1082
 
 
1083
 
 
1084
 
    def test_walkdirs_encoding_error(self):
1085
 
        # <https://bugs.launchpad.net/bzr/+bug/488519>
1086
 
        # walkdirs didn't raise a useful message when the filenames
1087
 
        # are not using the filesystem's encoding
1088
 
 
1089
 
        # require a bytestring based filesystem
1090
 
        self.requireFeature(tests.ByteStringNamedFilesystem)
1091
 
 
1092
 
        tree = [
1093
 
            '.bzr',
1094
 
            '0file',
1095
 
            '1dir/',
1096
 
            '1dir/0file',
1097
 
            '1dir/1dir/',
1098
 
            '1file'
1099
 
            ]
1100
 
 
1101
 
        self.build_tree(tree)
1102
 
 
1103
 
        # rename the 1file to a latin-1 filename
1104
 
        os.rename("./1file", "\xe8file")
1105
 
        if "\xe8file" not in os.listdir("."):
1106
 
            self.skip("Lack filesystem that preserves arbitrary bytes")
1107
 
 
1108
 
        self._save_platform_info()
1109
 
        win32utils.winver = None # Avoid the win32 detection code
1110
 
        osutils._fs_enc = 'UTF-8'
1111
 
 
1112
 
        # this should raise on error
1113
 
        def attempt():
1114
 
            for dirdetail, dirblock in osutils.walkdirs('.'):
1115
 
                pass
1116
 
 
1117
 
        self.assertRaises(errors.BadFilenameEncoding, attempt)
 
570
        self.assertEqual(expected_dirblocks[1:],
 
571
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1118
572
 
1119
573
    def test__walkdirs_utf8(self):
1120
574
        tree = [
1153
607
            result.append((dirdetail, dirblock))
1154
608
 
1155
609
        self.assertTrue(found_bzrdir)
1156
 
        self.assertExpectedBlocks(expected_dirblocks, result)
1157
 
 
 
610
        self.assertEqual(expected_dirblocks,
 
611
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1158
612
        # you can search a subdir only, with a supplied prefix.
1159
613
        result = []
1160
614
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
1161
615
            result.append(dirblock)
1162
 
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
616
        self.assertEqual(expected_dirblocks[1:],
 
617
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1163
618
 
1164
619
    def _filter_out_stat(self, result):
1165
620
        """Filter out the stat value from the walkdirs result"""
1170
625
                new_dirblock.append((info[0], info[1], info[2], info[4]))
1171
626
            dirblock[:] = new_dirblock
1172
627
 
1173
 
    def _save_platform_info(self):
1174
 
        self.overrideAttr(win32utils, 'winver')
1175
 
        self.overrideAttr(osutils, '_fs_enc')
1176
 
        self.overrideAttr(osutils, '_selected_dir_reader')
1177
 
 
1178
 
    def assertDirReaderIs(self, expected):
1179
 
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1180
 
        # Force it to redetect
1181
 
        osutils._selected_dir_reader = None
1182
 
        # Nothing to list, but should still trigger the selection logic
1183
 
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
1184
 
        self.assertIsInstance(osutils._selected_dir_reader, expected)
1185
 
 
1186
 
    def test_force_walkdirs_utf8_fs_utf8(self):
1187
 
        self.requireFeature(UTF8DirReaderFeature)
1188
 
        self._save_platform_info()
1189
 
        win32utils.winver = None # Avoid the win32 detection code
1190
 
        osutils._fs_enc = 'UTF-8'
1191
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1192
 
 
1193
 
    def test_force_walkdirs_utf8_fs_ascii(self):
1194
 
        self.requireFeature(UTF8DirReaderFeature)
1195
 
        self._save_platform_info()
1196
 
        win32utils.winver = None # Avoid the win32 detection code
1197
 
        osutils._fs_enc = 'US-ASCII'
1198
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1199
 
 
1200
 
    def test_force_walkdirs_utf8_fs_ANSI(self):
1201
 
        self.requireFeature(UTF8DirReaderFeature)
1202
 
        self._save_platform_info()
1203
 
        win32utils.winver = None # Avoid the win32 detection code
1204
 
        osutils._fs_enc = 'ANSI_X3.4-1968'
1205
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
1206
 
 
1207
 
    def test_force_walkdirs_utf8_fs_latin1(self):
1208
 
        self._save_platform_info()
1209
 
        win32utils.winver = None # Avoid the win32 detection code
1210
 
        osutils._fs_enc = 'latin1'
1211
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1212
 
 
1213
 
    def test_force_walkdirs_utf8_nt(self):
1214
 
        # Disabled because the thunk of the whole walkdirs api is disabled.
1215
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1216
 
        self._save_platform_info()
1217
 
        win32utils.winver = 'Windows NT'
1218
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1219
 
        self.assertDirReaderIs(Win32ReadDir)
1220
 
 
1221
 
    def test_force_walkdirs_utf8_98(self):
1222
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1223
 
        self._save_platform_info()
1224
 
        win32utils.winver = 'Windows 98'
1225
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
1226
 
 
1227
628
    def test_unicode_walkdirs(self):
1228
629
        """Walkdirs should always return unicode paths."""
1229
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1230
630
        name0 = u'0file-\xb6'
1231
631
        name1 = u'1dir-\u062c\u0648'
1232
632
        name2 = u'2file-\u0633'
1237
637
            name1 + '/' + name1 + '/',
1238
638
            name2,
1239
639
            ]
1240
 
        self.build_tree(tree)
 
640
        try:
 
641
            self.build_tree(tree)
 
642
        except UnicodeError:
 
643
            raise TestSkipped('Could not represent Unicode chars'
 
644
                              ' in current encoding.')
1241
645
        expected_dirblocks = [
1242
646
                ((u'', u'.'),
1243
647
                 [(name0, name0, 'file', './' + name0),
1269
673
 
1270
674
        The abspath portion might be in unicode or utf-8
1271
675
        """
1272
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1273
676
        name0 = u'0file-\xb6'
1274
677
        name1 = u'1dir-\u062c\u0648'
1275
678
        name2 = u'2file-\u0633'
1280
683
            name1 + '/' + name1 + '/',
1281
684
            name2,
1282
685
            ]
1283
 
        self.build_tree(tree)
 
686
        try:
 
687
            self.build_tree(tree)
 
688
        except UnicodeError:
 
689
            raise TestSkipped('Could not represent Unicode chars'
 
690
                              ' in current encoding.')
1284
691
        name0 = name0.encode('utf8')
1285
692
        name1 = name1.encode('utf8')
1286
693
        name2 = name2.encode('utf8')
1325
732
            result.append((dirdetail, new_dirblock))
1326
733
        self.assertEqual(expected_dirblocks, result)
1327
734
 
1328
 
    def test__walkdirs_utf8_with_unicode_fs(self):
1329
 
        """UnicodeDirReader should be a safe fallback everywhere
 
735
    def test_unicode__walkdirs_unicode_to_utf8(self):
 
736
        """walkdirs_unicode_to_utf8 should be a safe fallback everywhere
1330
737
 
1331
738
        The abspath portion should be in unicode
1332
739
        """
1333
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1334
 
        # Use the unicode reader. TODO: split into driver-and-driven unit
1335
 
        # tests.
1336
 
        self._save_platform_info()
1337
 
        osutils._selected_dir_reader = osutils.UnicodeDirReader()
1338
 
        name0u = u'0file-\xb6'
1339
 
        name1u = u'1dir-\u062c\u0648'
1340
 
        name2u = u'2file-\u0633'
1341
 
        tree = [
1342
 
            name0u,
1343
 
            name1u + '/',
1344
 
            name1u + '/' + name0u,
1345
 
            name1u + '/' + name1u + '/',
1346
 
            name2u,
1347
 
            ]
1348
 
        self.build_tree(tree)
1349
 
        name0 = name0u.encode('utf8')
1350
 
        name1 = name1u.encode('utf8')
1351
 
        name2 = name2u.encode('utf8')
1352
 
 
1353
 
        # All of the abspaths should be in unicode, all of the relative paths
1354
 
        # should be in utf8
1355
 
        expected_dirblocks = [
1356
 
                (('', '.'),
1357
 
                 [(name0, name0, 'file', './' + name0u),
1358
 
                  (name1, name1, 'directory', './' + name1u),
1359
 
                  (name2, name2, 'file', './' + name2u),
1360
 
                 ]
1361
 
                ),
1362
 
                ((name1, './' + name1u),
1363
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1364
 
                                                        + '/' + name0u),
1365
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1366
 
                                                            + '/' + name1u),
1367
 
                 ]
1368
 
                ),
1369
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1370
 
                 [
1371
 
                 ]
1372
 
                ),
1373
 
            ]
1374
 
        result = list(osutils._walkdirs_utf8('.'))
1375
 
        self._filter_out_stat(result)
1376
 
        self.assertEqual(expected_dirblocks, result)
1377
 
 
1378
 
    def test__walkdirs_utf8_win32readdir(self):
1379
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1380
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1381
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1382
 
        self._save_platform_info()
1383
 
        osutils._selected_dir_reader = Win32ReadDir()
1384
 
        name0u = u'0file-\xb6'
1385
 
        name1u = u'1dir-\u062c\u0648'
1386
 
        name2u = u'2file-\u0633'
1387
 
        tree = [
1388
 
            name0u,
1389
 
            name1u + '/',
1390
 
            name1u + '/' + name0u,
1391
 
            name1u + '/' + name1u + '/',
1392
 
            name2u,
1393
 
            ]
1394
 
        self.build_tree(tree)
1395
 
        name0 = name0u.encode('utf8')
1396
 
        name1 = name1u.encode('utf8')
1397
 
        name2 = name2u.encode('utf8')
1398
 
 
1399
 
        # All of the abspaths should be in unicode, all of the relative paths
1400
 
        # should be in utf8
1401
 
        expected_dirblocks = [
1402
 
                (('', '.'),
1403
 
                 [(name0, name0, 'file', './' + name0u),
1404
 
                  (name1, name1, 'directory', './' + name1u),
1405
 
                  (name2, name2, 'file', './' + name2u),
1406
 
                 ]
1407
 
                ),
1408
 
                ((name1, './' + name1u),
1409
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1410
 
                                                        + '/' + name0u),
1411
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1412
 
                                                            + '/' + name1u),
1413
 
                 ]
1414
 
                ),
1415
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1416
 
                 [
1417
 
                 ]
1418
 
                ),
1419
 
            ]
1420
 
        result = list(osutils._walkdirs_utf8(u'.'))
1421
 
        self._filter_out_stat(result)
1422
 
        self.assertEqual(expected_dirblocks, result)
1423
 
 
1424
 
    def assertStatIsCorrect(self, path, win32stat):
1425
 
        os_stat = os.stat(path)
1426
 
        self.assertEqual(os_stat.st_size, win32stat.st_size)
1427
 
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
1428
 
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
1429
 
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
1430
 
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
1431
 
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
1432
 
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
1433
 
 
1434
 
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1435
 
        """make sure our Stat values are valid"""
1436
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1437
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1438
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1439
 
        name0u = u'0file-\xb6'
1440
 
        name0 = name0u.encode('utf8')
1441
 
        self.build_tree([name0u])
1442
 
        # I hate to sleep() here, but I'm trying to make the ctime different
1443
 
        # from the mtime
1444
 
        time.sleep(2)
1445
 
        f = open(name0u, 'ab')
 
740
        name0u = u'0file-\xb6'
 
741
        name1u = u'1dir-\u062c\u0648'
 
742
        name2u = u'2file-\u0633'
 
743
        tree = [
 
744
            name0u,
 
745
            name1u + '/',
 
746
            name1u + '/' + name0u,
 
747
            name1u + '/' + name1u + '/',
 
748
            name2u,
 
749
            ]
1446
750
        try:
1447
 
            f.write('just a small update')
1448
 
        finally:
1449
 
            f.close()
1450
 
 
1451
 
        result = Win32ReadDir().read_dir('', u'.')
1452
 
        entry = result[0]
1453
 
        self.assertEqual((name0, name0, 'file'), entry[:3])
1454
 
        self.assertEqual(u'./' + name0u, entry[4])
1455
 
        self.assertStatIsCorrect(entry[4], entry[3])
1456
 
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
1457
 
 
1458
 
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1459
 
        """make sure our Stat values are valid"""
1460
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
1461
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1462
 
        from bzrlib._walkdirs_win32 import Win32ReadDir
1463
 
        name0u = u'0dir-\u062c\u0648'
 
751
            self.build_tree(tree)
 
752
        except UnicodeError:
 
753
            raise TestSkipped('Could not represent Unicode chars'
 
754
                              ' in current encoding.')
1464
755
        name0 = name0u.encode('utf8')
1465
 
        self.build_tree([name0u + '/'])
 
756
        name1 = name1u.encode('utf8')
 
757
        name2 = name2u.encode('utf8')
1466
758
 
1467
 
        result = Win32ReadDir().read_dir('', u'.')
1468
 
        entry = result[0]
1469
 
        self.assertEqual((name0, name0, 'directory'), entry[:3])
1470
 
        self.assertEqual(u'./' + name0u, entry[4])
1471
 
        self.assertStatIsCorrect(entry[4], entry[3])
 
759
        # All of the abspaths should be in unicode, all of the relative paths
 
760
        # should be in utf8
 
761
        expected_dirblocks = [
 
762
                (('', '.'),
 
763
                 [(name0, name0, 'file', './' + name0u),
 
764
                  (name1, name1, 'directory', './' + name1u),
 
765
                  (name2, name2, 'file', './' + name2u),
 
766
                 ]
 
767
                ),
 
768
                ((name1, './' + name1u),
 
769
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
770
                                                        + '/' + name0u),
 
771
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
772
                                                            + '/' + name1u),
 
773
                 ]
 
774
                ),
 
775
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
776
                 [
 
777
                 ]
 
778
                ),
 
779
            ]
 
780
        result = list(osutils._walkdirs_unicode_to_utf8('.'))
 
781
        self._filter_out_stat(result)
 
782
        self.assertEqual(expected_dirblocks, result)
1472
783
 
1473
784
    def assertPathCompare(self, path_less, path_greater):
1474
785
        """check that path_less and path_greater compare correctly."""
1548
859
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1549
860
 
1550
861
 
1551
 
class TestCopyTree(tests.TestCaseInTempDir):
1552
 
 
 
862
class TestCopyTree(TestCaseInTempDir):
 
863
    
1553
864
    def test_copy_basic_tree(self):
1554
865
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1555
866
        osutils.copy_tree('source', 'target')
1564
875
        self.assertEqual(['c'], os.listdir('target/b'))
1565
876
 
1566
877
    def test_copy_tree_symlinks(self):
1567
 
        self.requireFeature(tests.SymlinkFeature)
 
878
        if not osutils.has_symlinks():
 
879
            return
1568
880
        self.build_tree(['source/'])
1569
881
        os.symlink('a/generic/path', 'source/lnk')
1570
882
        osutils.copy_tree('source', 'target')
1600
912
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1601
913
 
1602
914
 
1603
 
class TestSetUnsetEnv(tests.TestCase):
 
915
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
 
916
# [bialix] 2006/12/26
 
917
 
 
918
 
 
919
class TestSetUnsetEnv(TestCase):
1604
920
    """Test updating the environment"""
1605
921
 
1606
922
    def setUp(self):
1612
928
        def cleanup():
1613
929
            if 'BZR_TEST_ENV_VAR' in os.environ:
1614
930
                del os.environ['BZR_TEST_ENV_VAR']
 
931
 
1615
932
        self.addCleanup(cleanup)
1616
933
 
1617
934
    def test_set(self):
1629
946
 
1630
947
    def test_unicode(self):
1631
948
        """Environment can only contain plain strings
1632
 
 
 
949
        
1633
950
        So Unicode strings must be encoded.
1634
951
        """
1635
 
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
1636
 
        if uni_val is None:
1637
 
            raise tests.TestSkipped(
1638
 
                'Cannot find a unicode character that works in encoding %s'
1639
 
                % (osutils.get_user_encoding(),))
 
952
        # Try a few different characters, to see if we can get
 
953
        # one that will be valid in the user_encoding
 
954
        possible_vals = [u'm\xb5', u'\xe1', u'\u0410']
 
955
        for uni_val in possible_vals:
 
956
            try:
 
957
                env_val = uni_val.encode(bzrlib.user_encoding)
 
958
            except UnicodeEncodeError:
 
959
                # Try a different character
 
960
                pass
 
961
            else:
 
962
                break
 
963
        else:
 
964
            raise TestSkipped('Cannot find a unicode character that works in'
 
965
                              ' encoding %s' % (bzrlib.user_encoding,))
1640
966
 
1641
967
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1642
968
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1650
976
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1651
977
 
1652
978
 
1653
 
class TestSizeShaFile(tests.TestCaseInTempDir):
1654
 
 
1655
 
    def test_sha_empty(self):
1656
 
        self.build_tree_contents([('foo', '')])
1657
 
        expected_sha = osutils.sha_string('')
1658
 
        f = open('foo')
1659
 
        self.addCleanup(f.close)
1660
 
        size, sha = osutils.size_sha_file(f)
1661
 
        self.assertEqual(0, size)
1662
 
        self.assertEqual(expected_sha, sha)
1663
 
 
1664
 
    def test_sha_mixed_endings(self):
1665
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1666
 
        self.build_tree_contents([('foo', text)])
1667
 
        expected_sha = osutils.sha_string(text)
1668
 
        f = open('foo', 'rb')
1669
 
        self.addCleanup(f.close)
1670
 
        size, sha = osutils.size_sha_file(f)
1671
 
        self.assertEqual(38, size)
1672
 
        self.assertEqual(expected_sha, sha)
1673
 
 
1674
 
 
1675
 
class TestShaFileByName(tests.TestCaseInTempDir):
1676
 
 
1677
 
    def test_sha_empty(self):
1678
 
        self.build_tree_contents([('foo', '')])
1679
 
        expected_sha = osutils.sha_string('')
1680
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1681
 
 
1682
 
    def test_sha_mixed_endings(self):
1683
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1684
 
        self.build_tree_contents([('foo', text)])
1685
 
        expected_sha = osutils.sha_string(text)
1686
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1687
 
 
1688
 
 
1689
 
class TestResourceLoading(tests.TestCaseInTempDir):
1690
 
 
1691
 
    def test_resource_string(self):
1692
 
        # test resource in bzrlib
1693
 
        text = osutils.resource_string('bzrlib', 'debug.py')
1694
 
        self.assertContainsRe(text, "debug_flags = set()")
1695
 
        # test resource under bzrlib
1696
 
        text = osutils.resource_string('bzrlib.ui', 'text.py')
1697
 
        self.assertContainsRe(text, "class TextUIFactory")
1698
 
        # test unsupported package
1699
 
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1700
 
            'yyy.xx')
1701
 
        # test unknown resource
1702
 
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1703
 
 
1704
 
 
1705
 
class TestReCompile(tests.TestCase):
1706
 
 
1707
 
    def _deprecated_re_compile_checked(self, *args, **kwargs):
1708
 
        return self.applyDeprecated(symbol_versioning.deprecated_in((2, 2, 0)),
1709
 
            osutils.re_compile_checked, *args, **kwargs)
1710
 
 
1711
 
    def test_re_compile_checked(self):
1712
 
        r = self._deprecated_re_compile_checked(r'A*', re.IGNORECASE)
1713
 
        self.assertTrue(r.match('aaaa'))
1714
 
        self.assertTrue(r.match('aAaA'))
1715
 
 
1716
 
    def test_re_compile_checked_error(self):
1717
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1718
 
 
1719
 
        # Due to possible test isolation error, re.compile is not lazy at
1720
 
        # this point. We re-install lazy compile.
1721
 
        lazy_regex.install_lazy_compile()
1722
 
        err = self.assertRaises(
1723
 
            errors.BzrCommandError,
1724
 
            self._deprecated_re_compile_checked, '*', re.IGNORECASE, 'test case')
1725
 
        self.assertEqual(
1726
 
            'Invalid regular expression in test case: '
1727
 
            '"*" nothing to repeat',
1728
 
            str(err))
1729
 
 
1730
 
 
1731
 
class TestDirReader(tests.TestCaseInTempDir):
1732
 
 
1733
 
    scenarios = dir_reader_scenarios()
1734
 
 
1735
 
    # Set by load_tests
1736
 
    _dir_reader_class = None
1737
 
    _native_to_unicode = None
1738
 
 
1739
 
    def setUp(self):
1740
 
        tests.TestCaseInTempDir.setUp(self)
1741
 
        self.overrideAttr(osutils,
1742
 
                          '_selected_dir_reader', self._dir_reader_class())
1743
 
 
1744
 
    def _get_ascii_tree(self):
1745
 
        tree = [
1746
 
            '0file',
1747
 
            '1dir/',
1748
 
            '1dir/0file',
1749
 
            '1dir/1dir/',
1750
 
            '2file'
1751
 
            ]
1752
 
        expected_dirblocks = [
1753
 
                (('', '.'),
1754
 
                 [('0file', '0file', 'file'),
1755
 
                  ('1dir', '1dir', 'directory'),
1756
 
                  ('2file', '2file', 'file'),
1757
 
                 ]
1758
 
                ),
1759
 
                (('1dir', './1dir'),
1760
 
                 [('1dir/0file', '0file', 'file'),
1761
 
                  ('1dir/1dir', '1dir', 'directory'),
1762
 
                 ]
1763
 
                ),
1764
 
                (('1dir/1dir', './1dir/1dir'),
1765
 
                 [
1766
 
                 ]
1767
 
                ),
1768
 
            ]
1769
 
        return tree, expected_dirblocks
1770
 
 
1771
 
    def test_walk_cur_dir(self):
1772
 
        tree, expected_dirblocks = self._get_ascii_tree()
1773
 
        self.build_tree(tree)
1774
 
        result = list(osutils._walkdirs_utf8('.'))
1775
 
        # Filter out stat and abspath
1776
 
        self.assertEqual(expected_dirblocks,
1777
 
                         [(dirinfo, [line[0:3] for line in block])
1778
 
                          for dirinfo, block in result])
1779
 
 
1780
 
    def test_walk_sub_dir(self):
1781
 
        tree, expected_dirblocks = self._get_ascii_tree()
1782
 
        self.build_tree(tree)
1783
 
        # you can search a subdir only, with a supplied prefix.
1784
 
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1785
 
        # Filter out stat and abspath
1786
 
        self.assertEqual(expected_dirblocks[1:],
1787
 
                         [(dirinfo, [line[0:3] for line in block])
1788
 
                          for dirinfo, block in result])
1789
 
 
1790
 
    def _get_unicode_tree(self):
1791
 
        name0u = u'0file-\xb6'
1792
 
        name1u = u'1dir-\u062c\u0648'
1793
 
        name2u = u'2file-\u0633'
1794
 
        tree = [
1795
 
            name0u,
1796
 
            name1u + '/',
1797
 
            name1u + '/' + name0u,
1798
 
            name1u + '/' + name1u + '/',
1799
 
            name2u,
1800
 
            ]
1801
 
        name0 = name0u.encode('UTF-8')
1802
 
        name1 = name1u.encode('UTF-8')
1803
 
        name2 = name2u.encode('UTF-8')
1804
 
        expected_dirblocks = [
1805
 
                (('', '.'),
1806
 
                 [(name0, name0, 'file', './' + name0u),
1807
 
                  (name1, name1, 'directory', './' + name1u),
1808
 
                  (name2, name2, 'file', './' + name2u),
1809
 
                 ]
1810
 
                ),
1811
 
                ((name1, './' + name1u),
1812
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1813
 
                                                        + '/' + name0u),
1814
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1815
 
                                                            + '/' + name1u),
1816
 
                 ]
1817
 
                ),
1818
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1819
 
                 [
1820
 
                 ]
1821
 
                ),
1822
 
            ]
1823
 
        return tree, expected_dirblocks
1824
 
 
1825
 
    def _filter_out(self, raw_dirblocks):
1826
 
        """Filter out a walkdirs_utf8 result.
1827
 
 
1828
 
        stat field is removed, all native paths are converted to unicode
1829
 
        """
1830
 
        filtered_dirblocks = []
1831
 
        for dirinfo, block in raw_dirblocks:
1832
 
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1833
 
            details = []
1834
 
            for line in block:
1835
 
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1836
 
            filtered_dirblocks.append((dirinfo, details))
1837
 
        return filtered_dirblocks
1838
 
 
1839
 
    def test_walk_unicode_tree(self):
1840
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1841
 
        tree, expected_dirblocks = self._get_unicode_tree()
1842
 
        self.build_tree(tree)
1843
 
        result = list(osutils._walkdirs_utf8('.'))
1844
 
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1845
 
 
1846
 
    def test_symlink(self):
1847
 
        self.requireFeature(tests.SymlinkFeature)
1848
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1849
 
        target = u'target\N{Euro Sign}'
1850
 
        link_name = u'l\N{Euro Sign}nk'
1851
 
        os.symlink(target, link_name)
1852
 
        target_utf8 = target.encode('UTF-8')
1853
 
        link_name_utf8 = link_name.encode('UTF-8')
1854
 
        expected_dirblocks = [
1855
 
                (('', '.'),
1856
 
                 [(link_name_utf8, link_name_utf8,
1857
 
                   'symlink', './' + link_name),],
1858
 
                 )]
1859
 
        result = list(osutils._walkdirs_utf8('.'))
1860
 
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1861
 
 
1862
 
 
1863
 
class TestReadLink(tests.TestCaseInTempDir):
1864
 
    """Exposes os.readlink() problems and the osutils solution.
1865
 
 
1866
 
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
1867
 
    unicode string will be returned if a unicode string is passed.
1868
 
 
1869
 
    But prior python versions failed to properly encode the passed unicode
1870
 
    string.
1871
 
    """
1872
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1873
 
 
1874
 
    def setUp(self):
1875
 
        super(tests.TestCaseInTempDir, self).setUp()
1876
 
        self.link = u'l\N{Euro Sign}ink'
1877
 
        self.target = u'targe\N{Euro Sign}t'
1878
 
        os.symlink(self.target, self.link)
1879
 
 
1880
 
    def test_os_readlink_link_encoding(self):
1881
 
        if sys.version_info < (2, 6):
1882
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1883
 
        else:
1884
 
            self.assertEquals(self.target,  os.readlink(self.link))
1885
 
 
1886
 
    def test_os_readlink_link_decoding(self):
1887
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
1888
 
                          os.readlink(self.link.encode(osutils._fs_enc)))
1889
 
 
1890
 
 
1891
 
class TestConcurrency(tests.TestCase):
1892
 
 
1893
 
    def setUp(self):
1894
 
        super(TestConcurrency, self).setUp()
1895
 
        self.overrideAttr(osutils, '_cached_local_concurrency')
1896
 
 
1897
 
    def test_local_concurrency(self):
1898
 
        concurrency = osutils.local_concurrency()
1899
 
        self.assertIsInstance(concurrency, int)
1900
 
 
1901
 
    def test_local_concurrency_environment_variable(self):
1902
 
        self.overrideEnv('BZR_CONCURRENCY', '2')
1903
 
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1904
 
        self.overrideEnv('BZR_CONCURRENCY', '3')
1905
 
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1906
 
        self.overrideEnv('BZR_CONCURRENCY', 'foo')
1907
 
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1908
 
 
1909
 
    def test_option_concurrency(self):
1910
 
        self.overrideEnv('BZR_CONCURRENCY', '1')
1911
 
        self.run_bzr('rocks --concurrency 42')
1912
 
        # Command line overrides environment variable
1913
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1914
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1915
 
 
1916
 
 
1917
 
class TestFailedToLoadExtension(tests.TestCase):
1918
 
 
1919
 
    def _try_loading(self):
1920
 
        try:
1921
 
            import bzrlib._fictional_extension_py
1922
 
        except ImportError, e:
1923
 
            osutils.failed_to_load_extension(e)
1924
 
            return True
1925
 
 
1926
 
    def setUp(self):
1927
 
        super(TestFailedToLoadExtension, self).setUp()
1928
 
        self.overrideAttr(osutils, '_extension_load_failures', [])
1929
 
 
1930
 
    def test_failure_to_load(self):
1931
 
        self._try_loading()
1932
 
        self.assertLength(1, osutils._extension_load_failures)
1933
 
        self.assertEquals(osutils._extension_load_failures[0],
1934
 
            "No module named _fictional_extension_py")
1935
 
 
1936
 
    def test_report_extension_load_failures_no_warning(self):
1937
 
        self.assertTrue(self._try_loading())
1938
 
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1939
 
        # it used to give a Python warning; it no longer does
1940
 
        self.assertLength(0, warnings)
1941
 
 
1942
 
    def test_report_extension_load_failures_message(self):
1943
 
        log = StringIO()
1944
 
        trace.push_log_file(log)
1945
 
        self.assertTrue(self._try_loading())
1946
 
        osutils.report_extension_load_failures()
1947
 
        self.assertContainsRe(
1948
 
            log.getvalue(),
1949
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1950
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1951
 
            )
1952
 
 
1953
 
 
1954
 
class TestTerminalWidth(tests.TestCase):
1955
 
 
1956
 
    def setUp(self):
1957
 
        tests.TestCase.setUp(self)
1958
 
        self._orig_terminal_size_state = osutils._terminal_size_state
1959
 
        self._orig_first_terminal_size = osutils._first_terminal_size
1960
 
        self.addCleanup(self.restore_osutils_globals)
1961
 
        osutils._terminal_size_state = 'no_data'
1962
 
        osutils._first_terminal_size = None
1963
 
 
1964
 
    def restore_osutils_globals(self):
1965
 
        osutils._terminal_size_state = self._orig_terminal_size_state
1966
 
        osutils._first_terminal_size = self._orig_first_terminal_size
1967
 
 
1968
 
    def replace_stdout(self, new):
1969
 
        self.overrideAttr(sys, 'stdout', new)
1970
 
 
1971
 
    def replace__terminal_size(self, new):
1972
 
        self.overrideAttr(osutils, '_terminal_size', new)
1973
 
 
1974
 
    def set_fake_tty(self):
1975
 
 
1976
 
        class I_am_a_tty(object):
1977
 
            def isatty(self):
1978
 
                return True
1979
 
 
1980
 
        self.replace_stdout(I_am_a_tty())
1981
 
 
1982
 
    def test_default_values(self):
1983
 
        self.assertEqual(80, osutils.default_terminal_width)
1984
 
 
1985
 
    def test_defaults_to_BZR_COLUMNS(self):
1986
 
        # BZR_COLUMNS is set by the test framework
1987
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1988
 
        self.overrideEnv('BZR_COLUMNS', '12')
1989
 
        self.assertEqual(12, osutils.terminal_width())
1990
 
 
1991
 
    def test_BZR_COLUMNS_0_no_limit(self):
1992
 
        self.overrideEnv('BZR_COLUMNS', '0')
1993
 
        self.assertEqual(None, osutils.terminal_width())
1994
 
 
1995
 
    def test_falls_back_to_COLUMNS(self):
1996
 
        self.overrideEnv('BZR_COLUMNS', None)
1997
 
        self.assertNotEqual('42', os.environ['COLUMNS'])
1998
 
        self.set_fake_tty()
1999
 
        self.overrideEnv('COLUMNS', '42')
2000
 
        self.assertEqual(42, osutils.terminal_width())
2001
 
 
2002
 
    def test_tty_default_without_columns(self):
2003
 
        self.overrideEnv('BZR_COLUMNS', None)
2004
 
        self.overrideEnv('COLUMNS', None)
2005
 
 
2006
 
        def terminal_size(w, h):
2007
 
            return 42, 42
2008
 
 
2009
 
        self.set_fake_tty()
2010
 
        # We need to override the osutils definition as it depends on the
2011
 
        # running environment that we can't control (PQM running without a
2012
 
        # controlling terminal is one example).
2013
 
        self.replace__terminal_size(terminal_size)
2014
 
        self.assertEqual(42, osutils.terminal_width())
2015
 
 
2016
 
    def test_non_tty_default_without_columns(self):
2017
 
        self.overrideEnv('BZR_COLUMNS', None)
2018
 
        self.overrideEnv('COLUMNS', None)
2019
 
        self.replace_stdout(None)
2020
 
        self.assertEqual(None, osutils.terminal_width())
2021
 
 
2022
 
    def test_no_TIOCGWINSZ(self):
2023
 
        self.requireFeature(term_ios_feature)
2024
 
        termios = term_ios_feature.module
2025
 
        # bug 63539 is about a termios without TIOCGWINSZ attribute
2026
 
        try:
2027
 
            orig = termios.TIOCGWINSZ
2028
 
        except AttributeError:
2029
 
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
2030
 
            pass
2031
 
        else:
2032
 
            self.overrideAttr(termios, 'TIOCGWINSZ')
2033
 
            del termios.TIOCGWINSZ
2034
 
        self.overrideEnv('BZR_COLUMNS', None)
2035
 
        self.overrideEnv('COLUMNS', None)
2036
 
        # Whatever the result is, if we don't raise an exception, it's ok.
2037
 
        osutils.terminal_width()
2038
 
 
2039
 
class TestCreationOps(tests.TestCaseInTempDir):
2040
 
    _test_needs_features = [features.chown_feature]
2041
 
 
2042
 
    def setUp(self):
2043
 
        tests.TestCaseInTempDir.setUp(self)
2044
 
        self.overrideAttr(os, 'chown', self._dummy_chown)
2045
 
 
2046
 
        # params set by call to _dummy_chown
2047
 
        self.path = self.uid = self.gid = None
2048
 
 
2049
 
    def _dummy_chown(self, path, uid, gid):
2050
 
        self.path, self.uid, self.gid = path, uid, gid
2051
 
 
2052
 
    def test_copy_ownership_from_path(self):
2053
 
        """copy_ownership_from_path test with specified src."""
2054
 
        ownsrc = '/'
2055
 
        f = open('test_file', 'wt')
2056
 
        osutils.copy_ownership_from_path('test_file', ownsrc)
2057
 
 
2058
 
        s = os.stat(ownsrc)
2059
 
        self.assertEquals(self.path, 'test_file')
2060
 
        self.assertEquals(self.uid, s.st_uid)
2061
 
        self.assertEquals(self.gid, s.st_gid)
2062
 
 
2063
 
    def test_copy_ownership_nonesrc(self):
2064
 
        """copy_ownership_from_path test with src=None."""
2065
 
        f = open('test_file', 'wt')
2066
 
        # should use parent dir for permissions
2067
 
        osutils.copy_ownership_from_path('test_file')
2068
 
 
2069
 
        s = os.stat('..')
2070
 
        self.assertEquals(self.path, 'test_file')
2071
 
        self.assertEquals(self.uid, s.st_uid)
2072
 
        self.assertEquals(self.gid, s.st_gid)
2073
 
 
2074
 
class TestGetuserUnicode(tests.TestCase):
2075
 
 
2076
 
    def test_ascii_user(self):
2077
 
        self.overrideEnv('LOGNAME', 'jrandom')
2078
 
        self.assertEqual(u'jrandom', osutils.getuser_unicode())
2079
 
 
2080
 
    def test_unicode_user(self):
2081
 
        ue = osutils.get_user_encoding()
2082
 
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
2083
 
        if uni_val is None:
2084
 
            raise tests.TestSkipped(
2085
 
                'Cannot find a unicode character that works in encoding %s'
2086
 
                % (osutils.get_user_encoding(),))
2087
 
        uni_username = u'jrandom' + uni_val
2088
 
        encoded_username = uni_username.encode(ue)
2089
 
        self.overrideEnv('LOGNAME', encoded_username)
2090
 
        self.assertEqual(uni_username, osutils.getuser_unicode())
2091
 
        self.overrideEnv('LOGNAME', u'jrandom\xb6'.encode(ue))
2092
 
        self.assertEqual(u'jrandom\xb6', osutils.getuser_unicode())
2093
 
 
2094
 
class TestBackupNames(tests.TestCase):
2095
 
 
2096
 
    def setUp(self):
2097
 
        super(TestBackupNames, self).setUp()
2098
 
        self.backups = []
2099
 
 
2100
 
    def backup_exists(self, name):
2101
 
        return name in self.backups
2102
 
 
2103
 
    def available_backup_name(self, name):
2104
 
        backup_name = osutils.available_backup_name(name, self.backup_exists)
2105
 
        self.backups.append(backup_name)
2106
 
        return backup_name
2107
 
 
2108
 
    def assertBackupName(self, expected, name):
2109
 
        self.assertEqual(expected, self.available_backup_name(name))
2110
 
 
2111
 
    def test_empty(self):
2112
 
        self.assertBackupName('file.~1~', 'file')
2113
 
 
2114
 
    def test_existing(self):
2115
 
        self.available_backup_name('file')
2116
 
        self.available_backup_name('file')
2117
 
        self.assertBackupName('file.~3~', 'file')
2118
 
        # Empty slots are found, this is not a strict requirement and may be
2119
 
        # revisited if we test against all implementations.
2120
 
        self.backups.remove('file.~2~')
2121
 
        self.assertBackupName('file.~2~', 'file')
2122
 
 
2123
 
 
2124
 
class TestFindExecutableInPath(tests.TestCase):
2125
 
 
2126
 
    def test_windows(self):
2127
 
        if sys.platform != 'win32':
2128
 
            raise tests.TestSkipped('test requires win32')
2129
 
        self.assertTrue(osutils.find_executable_on_path('explorer') is not None)
2130
 
        self.assertTrue(
2131
 
            osutils.find_executable_on_path('explorer.exe') is not None)
2132
 
        self.assertTrue(
2133
 
            osutils.find_executable_on_path('EXPLORER.EXE') is not None)
2134
 
        self.assertTrue(
2135
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
2136
 
        self.assertTrue(osutils.find_executable_on_path('file.txt') is None)
2137
 
 
2138
 
    def test_other(self):
2139
 
        if sys.platform == 'win32':
2140
 
            raise tests.TestSkipped('test requires non-win32')
2141
 
        self.assertTrue(osutils.find_executable_on_path('sh') is not None)
2142
 
        self.assertTrue(
2143
 
            osutils.find_executable_on_path('THIS SHOULD NOT EXIST') is None)
 
979
class TestLocalTimeOffset(TestCase):
 
980
 
 
981
    def test_local_time_offset(self):
 
982
        """Test that local_time_offset() returns a sane value."""
 
983
        offset = osutils.local_time_offset()
 
984
        self.assertTrue(isinstance(offset, int))
 
985
        # Test that the offset is no more than a eighteen hours in
 
986
        # either direction.
 
987
        # Time zone handling is system specific, so it is difficult to
 
988
        # do more specific tests, but a value outside of this range is
 
989
        # probably wrong.
 
990
        eighteen_hours = 18 * 3600
 
991
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
992
 
 
993
    def test_local_time_offset_with_timestamp(self):
 
994
        """Test that local_time_offset() works with a timestamp."""
 
995
        offset = osutils.local_time_offset(1000000000.1234567)
 
996
        self.assertTrue(isinstance(offset, int))
 
997
        eighteen_hours = 18 * 3600
 
998
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)