~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Martin Pool
  • Date: 2008-10-20 23:58:12 UTC
  • mto: This revision was merged to the branch mainline in revision 3787.
  • Revision ID: mbp@sourcefrog.net-20081020235812-itg90mk0u4dez92z
lp-upload-release now handles names like bzr-1.8.tar.gz

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
19
from cStringIO import StringIO
20
20
import errno
21
21
import os
22
 
import re
23
22
import socket
24
23
import stat
25
24
import sys
29
28
    errors,
30
29
    osutils,
31
30
    tests,
32
 
    trace,
33
31
    win32utils,
34
32
    )
 
33
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
 
34
from bzrlib.osutils import (
 
35
        is_inside_any,
 
36
        is_inside_or_parent_of_any,
 
37
        pathjoin,
 
38
        pumpfile,
 
39
        pump_string_file,
 
40
        )
35
41
from bzrlib.tests import (
36
 
    file_utils,
37
 
    test__walkdirs_win32,
 
42
        adapt_tests,
 
43
        Feature,
 
44
        probe_unicode_in_user_encoding,
 
45
        split_suite_by_re,
 
46
        StringIOWrapper,
 
47
        SymlinkFeature,
 
48
        TestCase,
 
49
        TestCaseInTempDir,
 
50
        TestScenarioApplier,
 
51
        TestSkipped,
 
52
        )
 
53
from bzrlib.tests.file_utils import (
 
54
    FakeReadFile,
38
55
    )
39
 
 
40
 
 
41
 
class _UTF8DirReaderFeature(tests.Feature):
 
56
from bzrlib.tests.test__walkdirs_win32 import Win32ReadDirFeature
 
57
 
 
58
 
 
59
class _UTF8DirReaderFeature(Feature):
42
60
 
43
61
    def _probe(self):
44
62
        try:
53
71
 
54
72
UTF8DirReaderFeature = _UTF8DirReaderFeature()
55
73
 
56
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
57
 
 
58
 
 
59
 
def _already_unicode(s):
60
 
    return s
61
 
 
62
 
 
63
 
def _utf8_to_unicode(s):
64
 
    return s.decode('UTF-8')
65
 
 
66
 
 
67
 
def dir_reader_scenarios():
68
 
    # For each dir reader we define:
69
 
 
70
 
    # - native_to_unicode: a function converting the native_abspath as returned
71
 
    #   by DirReader.read_dir to its unicode representation
72
 
 
73
 
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
74
 
    scenarios = [('unicode',
75
 
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
76
 
                       _native_to_unicode=_already_unicode))]
77
 
    # Some DirReaders are platform specific and even there they may not be
78
 
    # available.
79
 
    if UTF8DirReaderFeature.available():
80
 
        from bzrlib import _readdir_pyx
81
 
        scenarios.append(('utf8',
82
 
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
83
 
                               _native_to_unicode=_utf8_to_unicode)))
84
 
 
85
 
    if test__walkdirs_win32.win32_readdir_feature.available():
86
 
        try:
87
 
            from bzrlib import _walkdirs_win32
88
 
            scenarios.append(
89
 
                ('win32',
90
 
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
91
 
                      _native_to_unicode=_already_unicode)))
92
 
        except ImportError:
93
 
            pass
94
 
    return scenarios
95
 
 
96
 
 
97
 
def load_tests(basic_tests, module, loader):
98
 
    suite = loader.suiteClass()
99
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
100
 
        basic_tests, tests.condition_isinstance(TestDirReader))
101
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
102
 
    suite.addTest(remaining_tests)
103
 
    return suite
104
 
 
105
 
 
106
 
class TestContainsWhitespace(tests.TestCase):
 
74
 
 
75
class TestOSUtils(TestCaseInTempDir):
107
76
 
108
77
    def test_contains_whitespace(self):
109
78
        self.failUnless(osutils.contains_whitespace(u' '))
119
88
        self.failIf(osutils.contains_whitespace(u'hellothere'))
120
89
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
121
90
 
122
 
 
123
 
class TestRename(tests.TestCaseInTempDir):
124
 
 
125
 
    def create_file(self, filename, content):
126
 
        f = open(filename, 'wb')
127
 
        try:
128
 
            f.write(content)
129
 
        finally:
130
 
            f.close()
131
 
 
132
 
    def _fancy_rename(self, a, b):
133
 
        osutils.fancy_rename(a, b, rename_func=os.rename,
134
 
                             unlink_func=os.unlink)
135
 
 
136
91
    def test_fancy_rename(self):
137
92
        # This should work everywhere
138
 
        self.create_file('a', 'something in a\n')
139
 
        self._fancy_rename('a', 'b')
 
93
        def rename(a, b):
 
94
            osutils.fancy_rename(a, b,
 
95
                    rename_func=os.rename,
 
96
                    unlink_func=os.unlink)
 
97
 
 
98
        open('a', 'wb').write('something in a\n')
 
99
        rename('a', 'b')
140
100
        self.failIfExists('a')
141
101
        self.failUnlessExists('b')
142
102
        self.check_file_contents('b', 'something in a\n')
143
103
 
144
 
        self.create_file('a', 'new something in a\n')
145
 
        self._fancy_rename('b', 'a')
 
104
        open('a', 'wb').write('new something in a\n')
 
105
        rename('b', 'a')
146
106
 
147
107
        self.check_file_contents('a', 'something in a\n')
148
108
 
149
 
    def test_fancy_rename_fails_source_missing(self):
150
 
        # An exception should be raised, and the target should be left in place
151
 
        self.create_file('target', 'data in target\n')
152
 
        self.assertRaises((IOError, OSError), self._fancy_rename,
153
 
                          'missingsource', 'target')
154
 
        self.failUnlessExists('target')
155
 
        self.check_file_contents('target', 'data in target\n')
156
 
 
157
 
    def test_fancy_rename_fails_if_source_and_target_missing(self):
158
 
        self.assertRaises((IOError, OSError), self._fancy_rename,
159
 
                          'missingsource', 'missingtarget')
160
 
 
161
109
    def test_rename(self):
162
110
        # Rename should be semi-atomic on all platforms
163
 
        self.create_file('a', 'something in a\n')
 
111
        open('a', 'wb').write('something in a\n')
164
112
        osutils.rename('a', 'b')
165
113
        self.failIfExists('a')
166
114
        self.failUnlessExists('b')
167
115
        self.check_file_contents('b', 'something in a\n')
168
116
 
169
 
        self.create_file('a', 'new something in a\n')
 
117
        open('a', 'wb').write('new something in a\n')
170
118
        osutils.rename('b', 'a')
171
119
 
172
120
        self.check_file_contents('a', 'something in a\n')
183
131
        shape = sorted(os.listdir('.'))
184
132
        self.assertEquals(['A', 'B'], shape)
185
133
 
186
 
 
187
 
class TestRandChars(tests.TestCase):
188
 
 
189
134
    def test_01_rand_chars_empty(self):
190
135
        result = osutils.rand_chars(0)
191
136
        self.assertEqual(result, '')
196
141
        self.assertEqual(type(result), str)
197
142
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
198
143
 
199
 
 
200
 
class TestIsInside(tests.TestCase):
201
 
 
202
144
    def test_is_inside(self):
203
145
        is_inside = osutils.is_inside
204
146
        self.assertTrue(is_inside('src', 'src/foo.c'))
209
151
        self.assertTrue(is_inside('', 'foo.c'))
210
152
 
211
153
    def test_is_inside_any(self):
212
 
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
 
154
        SRC_FOO_C = pathjoin('src', 'foo.c')
213
155
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
214
156
                         (['src'], SRC_FOO_C),
215
157
                         (['src'], 'src'),
216
158
                         ]:
217
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
159
            self.assert_(is_inside_any(dirs, fn))
218
160
        for dirs, fn in [(['src'], 'srccontrol'),
219
161
                         (['src'], 'srccontrol/foo')]:
220
 
            self.assertFalse(osutils.is_inside_any(dirs, fn))
 
162
            self.assertFalse(is_inside_any(dirs, fn))
221
163
 
222
164
    def test_is_inside_or_parent_of_any(self):
223
165
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
226
168
                         (['src/bar.c', 'bla/foo.c'], 'src'),
227
169
                         (['src'], 'src'),
228
170
                         ]:
229
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
230
 
 
 
171
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
172
            
231
173
        for dirs, fn in [(['src'], 'srccontrol'),
232
174
                         (['srccontrol/foo.c'], 'src'),
233
175
                         (['src'], 'srccontrol/foo')]:
234
 
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
235
 
 
236
 
 
237
 
class TestRmTree(tests.TestCaseInTempDir):
 
176
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
238
177
 
239
178
    def test_rmtree(self):
240
179
        # Check to remove tree with read-only files/dirs
254
193
        self.failIfExists('dir/file')
255
194
        self.failIfExists('dir')
256
195
 
257
 
 
258
 
class TestDeleteAny(tests.TestCaseInTempDir):
259
 
 
260
 
    def test_delete_any_readonly(self):
261
 
        # from <https://bugs.launchpad.net/bzr/+bug/218206>
262
 
        self.build_tree(['d/', 'f'])
263
 
        osutils.make_readonly('d')
264
 
        osutils.make_readonly('f')
265
 
 
266
 
        osutils.delete_any('f')
267
 
        osutils.delete_any('d')
268
 
 
269
 
 
270
 
class TestKind(tests.TestCaseInTempDir):
271
 
 
272
196
    def test_file_kind(self):
273
197
        self.build_tree(['file', 'dir/'])
274
198
        self.assertEquals('file', osutils.file_kind('file'))
276
200
        if osutils.has_symlinks():
277
201
            os.symlink('symlink', 'symlink')
278
202
            self.assertEquals('symlink', osutils.file_kind('symlink'))
279
 
 
 
203
        
280
204
        # TODO: jam 20060529 Test a block device
281
205
        try:
282
206
            os.lstat('/dev/null')
304
228
                os.remove('socket')
305
229
 
306
230
    def test_kind_marker(self):
307
 
        self.assertEqual("", osutils.kind_marker("file"))
308
 
        self.assertEqual("/", osutils.kind_marker('directory'))
309
 
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
310
 
        self.assertEqual("@", osutils.kind_marker("symlink"))
311
 
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
312
 
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
313
 
 
314
 
 
315
 
class TestUmask(tests.TestCaseInTempDir):
 
231
        self.assertEqual(osutils.kind_marker('file'), '')
 
232
        self.assertEqual(osutils.kind_marker('directory'), '/')
 
233
        self.assertEqual(osutils.kind_marker('symlink'), '@')
 
234
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
316
235
 
317
236
    def test_get_umask(self):
318
237
        if sys.platform == 'win32':
321
240
            return
322
241
 
323
242
        orig_umask = osutils.get_umask()
324
 
        self.addCleanup(os.umask, orig_umask)
325
 
        os.umask(0222)
326
 
        self.assertEqual(0222, osutils.get_umask())
327
 
        os.umask(0022)
328
 
        self.assertEqual(0022, osutils.get_umask())
329
 
        os.umask(0002)
330
 
        self.assertEqual(0002, osutils.get_umask())
331
 
        os.umask(0027)
332
 
        self.assertEqual(0027, osutils.get_umask())
333
 
 
334
 
 
335
 
class TestDateTime(tests.TestCase):
 
243
        try:
 
244
            os.umask(0222)
 
245
            self.assertEqual(0222, osutils.get_umask())
 
246
            os.umask(0022)
 
247
            self.assertEqual(0022, osutils.get_umask())
 
248
            os.umask(0002)
 
249
            self.assertEqual(0002, osutils.get_umask())
 
250
            os.umask(0027)
 
251
            self.assertEqual(0027, osutils.get_umask())
 
252
        finally:
 
253
            os.umask(orig_umask)
336
254
 
337
255
    def assertFormatedDelta(self, expected, seconds):
338
256
        """Assert osutils.format_delta formats as expected"""
380
298
        # Instead blackbox.test_locale should check for localized
381
299
        # dates once they do occur in output strings.
382
300
 
383
 
    def test_format_date_with_offset_in_original_timezone(self):
384
 
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
385
 
            osutils.format_date_with_offset_in_original_timezone(0))
386
 
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
387
 
            osutils.format_date_with_offset_in_original_timezone(100000))
388
 
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
389
 
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
390
 
 
391
 
    def test_local_time_offset(self):
392
 
        """Test that local_time_offset() returns a sane value."""
393
 
        offset = osutils.local_time_offset()
394
 
        self.assertTrue(isinstance(offset, int))
395
 
        # Test that the offset is no more than a eighteen hours in
396
 
        # either direction.
397
 
        # Time zone handling is system specific, so it is difficult to
398
 
        # do more specific tests, but a value outside of this range is
399
 
        # probably wrong.
400
 
        eighteen_hours = 18 * 3600
401
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
402
 
 
403
 
    def test_local_time_offset_with_timestamp(self):
404
 
        """Test that local_time_offset() works with a timestamp."""
405
 
        offset = osutils.local_time_offset(1000000000.1234567)
406
 
        self.assertTrue(isinstance(offset, int))
407
 
        eighteen_hours = 18 * 3600
408
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
409
 
 
410
 
 
411
 
class TestLinks(tests.TestCaseInTempDir):
412
 
 
413
301
    def test_dereference_path(self):
414
 
        self.requireFeature(tests.SymlinkFeature)
 
302
        self.requireFeature(SymlinkFeature)
415
303
        cwd = osutils.realpath('.')
416
304
        os.mkdir('bar')
417
305
        bar_path = osutils.pathjoin(cwd, 'bar')
420
308
        self.assertEqual(bar_path, osutils.realpath('./bar'))
421
309
        os.symlink('bar', 'foo')
422
310
        self.assertEqual(bar_path, osutils.realpath('./foo'))
423
 
 
 
311
        
424
312
        # Does not dereference terminal symlinks
425
313
        foo_path = osutils.pathjoin(cwd, 'foo')
426
314
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
458
346
            osutils.make_readonly('dangling')
459
347
            osutils.make_writable('dangling')
460
348
 
 
349
    def test_kind_marker(self):
 
350
        self.assertEqual("", osutils.kind_marker("file"))
 
351
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
352
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
353
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
354
 
461
355
    def test_host_os_dereferences_symlinks(self):
462
356
        osutils.host_os_dereferences_symlinks()
463
357
 
464
358
 
465
 
class TestCanonicalRelPath(tests.TestCaseInTempDir):
466
 
 
467
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
468
 
 
469
 
    def test_canonical_relpath_simple(self):
470
 
        f = file('MixedCaseName', 'w')
471
 
        f.close()
472
 
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
473
 
        self.failUnlessEqual('work/MixedCaseName', actual)
474
 
 
475
 
    def test_canonical_relpath_missing_tail(self):
476
 
        os.mkdir('MixedCaseParent')
477
 
        actual = osutils.canonical_relpath(self.test_base_dir,
478
 
                                           'mixedcaseparent/nochild')
479
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
480
 
 
481
 
 
482
 
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
483
 
 
484
 
    def assertRelpath(self, expected, base, path):
485
 
        actual = osutils._cicp_canonical_relpath(base, path)
486
 
        self.assertEqual(expected, actual)
487
 
 
488
 
    def test_simple(self):
489
 
        self.build_tree(['MixedCaseName'])
490
 
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
491
 
        self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
492
 
 
493
 
    def test_subdir_missing_tail(self):
494
 
        self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
495
 
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
496
 
        self.assertRelpath('MixedCaseParent/a_child', base,
497
 
                           'MixedCaseParent/a_child')
498
 
        self.assertRelpath('MixedCaseParent/a_child', base,
499
 
                           'MixedCaseParent/A_Child')
500
 
        self.assertRelpath('MixedCaseParent/not_child', base,
501
 
                           'MixedCaseParent/not_child')
502
 
 
503
 
    def test_at_root_slash(self):
504
 
        # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
505
 
        # check...
506
 
        if osutils.MIN_ABS_PATHLENGTH > 1:
507
 
            raise tests.TestSkipped('relpath requires %d chars'
508
 
                                    % osutils.MIN_ABS_PATHLENGTH)
509
 
        self.assertRelpath('foo', '/', '/foo')
510
 
 
511
 
    def test_at_root_drive(self):
512
 
        if sys.platform != 'win32':
513
 
            raise tests.TestNotApplicable('we can only test drive-letter relative'
514
 
                                          ' paths on Windows where we have drive'
515
 
                                          ' letters.')
516
 
        # see bug #322807
517
 
        # The specific issue is that when at the root of a drive, 'abspath'
518
 
        # returns "C:/" or just "/". However, the code assumes that abspath
519
 
        # always returns something like "C:/foo" or "/foo" (no trailing slash).
520
 
        self.assertRelpath('foo', 'C:/', 'C:/foo')
521
 
        self.assertRelpath('foo', 'X:/', 'X:/foo')
522
 
        self.assertRelpath('foo', 'X:/', 'X://foo')
523
 
 
524
 
 
525
 
class TestPumpFile(tests.TestCase):
 
359
class TestPumpFile(TestCase):
526
360
    """Test pumpfile method."""
527
 
 
528
361
    def setUp(self):
529
 
        tests.TestCase.setUp(self)
530
362
        # create a test datablock
531
363
        self.block_size = 512
532
364
        pattern = '0123456789ABCDEF'
539
371
        # make sure test data is larger than max read size
540
372
        self.assertTrue(self.test_data_len > self.block_size)
541
373
 
542
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
374
        from_file = FakeReadFile(self.test_data)
543
375
        to_file = StringIO()
544
376
 
545
377
        # read (max / 2) bytes and verify read size wasn't affected
546
378
        num_bytes_to_read = self.block_size / 2
547
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
379
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
548
380
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
549
381
        self.assertEqual(from_file.get_read_count(), 1)
550
382
 
551
383
        # read (max) bytes and verify read size wasn't affected
552
384
        num_bytes_to_read = self.block_size
553
385
        from_file.reset_read_count()
554
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
386
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
555
387
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
556
388
        self.assertEqual(from_file.get_read_count(), 1)
557
389
 
558
390
        # read (max + 1) bytes and verify read size was limited
559
391
        num_bytes_to_read = self.block_size + 1
560
392
        from_file.reset_read_count()
561
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
393
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
562
394
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
563
395
        self.assertEqual(from_file.get_read_count(), 2)
564
396
 
565
397
        # finish reading the rest of the data
566
398
        num_bytes_to_read = self.test_data_len - to_file.tell()
567
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
399
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
568
400
 
569
401
        # report error if the data wasn't equal (we only report the size due
570
402
        # to the length of the data)
580
412
        self.assertTrue(self.test_data_len > self.block_size)
581
413
 
582
414
        # retrieve data in blocks
583
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
415
        from_file = FakeReadFile(self.test_data)
584
416
        to_file = StringIO()
585
 
        osutils.pumpfile(from_file, to_file, self.test_data_len,
586
 
                         self.block_size)
 
417
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
587
418
 
588
419
        # verify read size was equal to the maximum read size
589
420
        self.assertTrue(from_file.get_max_read_size() > 0)
604
435
        self.assertTrue(self.test_data_len > self.block_size)
605
436
 
606
437
        # retrieve data to EOF
607
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
438
        from_file = FakeReadFile(self.test_data)
608
439
        to_file = StringIO()
609
 
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
 
440
        pumpfile(from_file, to_file, -1, self.block_size)
610
441
 
611
442
        # verify read size was equal to the maximum read size
612
443
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
624
455
        test verifies that any existing usages of pumpfile will not be broken
625
456
        with this new version."""
626
457
        # retrieve data using default (old) pumpfile method
627
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
458
        from_file = FakeReadFile(self.test_data)
628
459
        to_file = StringIO()
629
 
        osutils.pumpfile(from_file, to_file)
 
460
        pumpfile(from_file, to_file)
630
461
 
631
462
        # report error if the data wasn't equal (we only report the size due
632
463
        # to the length of the data)
635
466
            message = "Data not equal.  Expected %d bytes, received %d."
636
467
            self.fail(message % (len(response_data), self.test_data_len))
637
468
 
638
 
    def test_report_activity(self):
639
 
        activity = []
640
 
        def log_activity(length, direction):
641
 
            activity.append((length, direction))
642
 
        from_file = StringIO(self.test_data)
643
 
        to_file = StringIO()
644
 
        osutils.pumpfile(from_file, to_file, buff_size=500,
645
 
                         report_activity=log_activity, direction='read')
646
 
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
647
 
                          (36, 'read')], activity)
648
 
 
649
 
        from_file = StringIO(self.test_data)
650
 
        to_file = StringIO()
651
 
        del activity[:]
652
 
        osutils.pumpfile(from_file, to_file, buff_size=500,
653
 
                         report_activity=log_activity, direction='write')
654
 
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
655
 
                          (36, 'write')], activity)
656
 
 
657
 
        # And with a limited amount of data
658
 
        from_file = StringIO(self.test_data)
659
 
        to_file = StringIO()
660
 
        del activity[:]
661
 
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
662
 
                         report_activity=log_activity, direction='read')
663
 
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
664
 
 
665
 
 
666
 
 
667
 
class TestPumpStringFile(tests.TestCase):
 
469
 
 
470
class TestPumpStringFile(TestCase):
668
471
 
669
472
    def test_empty(self):
670
473
        output = StringIO()
671
 
        osutils.pump_string_file("", output)
 
474
        pump_string_file("", output)
672
475
        self.assertEqual("", output.getvalue())
673
476
 
674
477
    def test_more_than_segment_size(self):
675
478
        output = StringIO()
676
 
        osutils.pump_string_file("123456789", output, 2)
 
479
        pump_string_file("123456789", output, 2)
677
480
        self.assertEqual("123456789", output.getvalue())
678
481
 
679
482
    def test_segment_size(self):
680
483
        output = StringIO()
681
 
        osutils.pump_string_file("12", output, 2)
 
484
        pump_string_file("12", output, 2)
682
485
        self.assertEqual("12", output.getvalue())
683
486
 
684
487
    def test_segment_size_multiple(self):
685
488
        output = StringIO()
686
 
        osutils.pump_string_file("1234", output, 2)
 
489
        pump_string_file("1234", output, 2)
687
490
        self.assertEqual("1234", output.getvalue())
688
491
 
689
492
 
690
 
class TestRelpath(tests.TestCase):
691
 
 
692
 
    def test_simple_relpath(self):
693
 
        cwd = osutils.getcwd()
694
 
        subdir = cwd + '/subdir'
695
 
        self.assertEqual('subdir', osutils.relpath(cwd, subdir))
696
 
 
697
 
    def test_deep_relpath(self):
698
 
        cwd = osutils.getcwd()
699
 
        subdir = cwd + '/sub/subsubdir'
700
 
        self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
701
 
 
702
 
    def test_not_relative(self):
703
 
        self.assertRaises(errors.PathNotChild,
704
 
                          osutils.relpath, 'C:/path', 'H:/path')
705
 
        self.assertRaises(errors.PathNotChild,
706
 
                          osutils.relpath, 'C:/', 'H:/path')
707
 
 
708
 
 
709
 
class TestSafeUnicode(tests.TestCase):
 
493
class TestSafeUnicode(TestCase):
710
494
 
711
495
    def test_from_ascii_string(self):
712
496
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
721
505
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
722
506
 
723
507
    def test_bad_utf8_string(self):
724
 
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
508
        self.assertRaises(BzrBadParameterNotUnicode,
725
509
                          osutils.safe_unicode,
726
510
                          '\xbb\xbb')
727
511
 
728
512
 
729
 
class TestSafeUtf8(tests.TestCase):
 
513
class TestSafeUtf8(TestCase):
730
514
 
731
515
    def test_from_ascii_string(self):
732
516
        f = 'foobar'
742
526
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
743
527
 
744
528
    def test_bad_utf8_string(self):
745
 
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
529
        self.assertRaises(BzrBadParameterNotUnicode,
746
530
                          osutils.safe_utf8, '\xbb\xbb')
747
531
 
748
532
 
749
 
class TestSafeRevisionId(tests.TestCase):
 
533
class TestSafeRevisionId(TestCase):
750
534
 
751
535
    def test_from_ascii_string(self):
752
536
        # this shouldn't give a warning because it's getting an ascii string
774
558
        self.assertEqual(None, osutils.safe_revision_id(None))
775
559
 
776
560
 
777
 
class TestSafeFileId(tests.TestCase):
 
561
class TestSafeFileId(TestCase):
778
562
 
779
563
    def test_from_ascii_string(self):
780
564
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
800
584
        self.assertEqual(None, osutils.safe_file_id(None))
801
585
 
802
586
 
803
 
class TestWin32Funcs(tests.TestCase):
804
 
    """Test that _win32 versions of os utilities return appropriate paths."""
 
587
class TestWin32Funcs(TestCase):
 
588
    """Test that the _win32 versions of os utilities return appropriate paths."""
805
589
 
806
590
    def test_abspath(self):
807
591
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
814
598
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
815
599
 
816
600
    def test_pathjoin(self):
817
 
        self.assertEqual('path/to/foo',
818
 
                         osutils._win32_pathjoin('path', 'to', 'foo'))
819
 
        self.assertEqual('C:/foo',
820
 
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
821
 
        self.assertEqual('C:/foo',
822
 
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
823
 
        self.assertEqual('path/to/foo',
824
 
                         osutils._win32_pathjoin('path/to/', 'foo'))
825
 
        self.assertEqual('/foo',
826
 
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
827
 
        self.assertEqual('/foo',
828
 
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
601
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
602
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
603
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
604
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
605
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
606
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
829
607
 
830
608
    def test_normpath(self):
831
 
        self.assertEqual('path/to/foo',
832
 
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
833
 
        self.assertEqual('path/to/foo',
834
 
                         osutils._win32_normpath('path//from/../to/./foo'))
 
609
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
610
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
835
611
 
836
612
    def test_getcwd(self):
837
613
        cwd = osutils._win32_getcwd()
866
642
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
867
643
 
868
644
 
869
 
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
 
645
class TestWin32FuncsDirs(TestCaseInTempDir):
870
646
    """Test win32 functions that create files."""
 
647
    
 
648
    def test_getcwd(self):
 
649
        if win32utils.winver == 'Windows 98':
 
650
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
 
651
        # Make sure getcwd can handle unicode filenames
 
652
        try:
 
653
            os.mkdir(u'mu-\xb5')
 
654
        except UnicodeError:
 
655
            raise TestSkipped("Unable to create Unicode filename")
871
656
 
872
 
    def test_getcwd(self):
873
 
        self.requireFeature(tests.UnicodeFilenameFeature)
874
 
        os.mkdir(u'mu-\xb5')
875
657
        os.chdir(u'mu-\xb5')
876
658
        # TODO: jam 20060427 This will probably fail on Mac OSX because
877
659
        #       it will change the normalization of B\xe5gfors
882
664
    def test_minimum_path_selection(self):
883
665
        self.assertEqual(set(),
884
666
            osutils.minimum_path_selection([]))
885
 
        self.assertEqual(set(['a']),
886
 
            osutils.minimum_path_selection(['a']))
887
667
        self.assertEqual(set(['a', 'b']),
888
668
            osutils.minimum_path_selection(['a', 'b']))
889
669
        self.assertEqual(set(['a/', 'b']),
890
670
            osutils.minimum_path_selection(['a/', 'b']))
891
671
        self.assertEqual(set(['a/', 'b']),
892
672
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
893
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
894
 
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
895
673
 
896
674
    def test_mkdtemp(self):
897
675
        tmpdir = osutils._win32_mkdtemp(dir='.')
953
731
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
954
732
 
955
733
 
956
 
class TestParentDirectories(tests.TestCaseInTempDir):
957
 
    """Test osutils.parent_directories()"""
958
 
 
959
 
    def test_parent_directories(self):
960
 
        self.assertEqual([], osutils.parent_directories('a'))
961
 
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
962
 
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
963
 
 
964
 
 
965
 
class TestMacFuncsDirs(tests.TestCaseInTempDir):
 
734
class TestMacFuncsDirs(TestCaseInTempDir):
966
735
    """Test mac special functions that require directories."""
967
736
 
968
737
    def test_getcwd(self):
969
 
        self.requireFeature(tests.UnicodeFilenameFeature)
970
 
        os.mkdir(u'B\xe5gfors')
 
738
        # On Mac, this will actually create Ba\u030agfors
 
739
        # but chdir will still work, because it accepts both paths
 
740
        try:
 
741
            os.mkdir(u'B\xe5gfors')
 
742
        except UnicodeError:
 
743
            raise TestSkipped("Unable to create Unicode filename")
 
744
 
971
745
        os.chdir(u'B\xe5gfors')
972
746
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
973
747
 
974
748
    def test_getcwd_nonnorm(self):
975
 
        self.requireFeature(tests.UnicodeFilenameFeature)
976
749
        # Test that _mac_getcwd() will normalize this path
977
 
        os.mkdir(u'Ba\u030agfors')
 
750
        try:
 
751
            os.mkdir(u'Ba\u030agfors')
 
752
        except UnicodeError:
 
753
            raise TestSkipped("Unable to create Unicode filename")
 
754
 
978
755
        os.chdir(u'Ba\u030agfors')
979
756
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
980
757
 
981
758
 
982
 
class TestChunksToLines(tests.TestCase):
983
 
 
984
 
    def test_smoketest(self):
985
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
986
 
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
987
 
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
988
 
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
989
 
 
990
 
    def test_osutils_binding(self):
991
 
        from bzrlib.tests import test__chunks_to_lines
992
 
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
993
 
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
994
 
        else:
995
 
            from bzrlib._chunks_to_lines_py import chunks_to_lines
996
 
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
997
 
 
998
 
 
999
 
class TestSplitLines(tests.TestCase):
 
759
class TestSplitLines(TestCase):
1000
760
 
1001
761
    def test_split_unicode(self):
1002
762
        self.assertEqual([u'foo\n', u'bar\xae'],
1009
769
                         osutils.split_lines('foo\rbar\n'))
1010
770
 
1011
771
 
1012
 
class TestWalkDirs(tests.TestCaseInTempDir):
1013
 
 
1014
 
    def assertExpectedBlocks(self, expected, result):
1015
 
        self.assertEqual(expected,
1016
 
                         [(dirinfo, [line[0:3] for line in block])
1017
 
                          for dirinfo, block in result])
 
772
class TestWalkDirs(TestCaseInTempDir):
1018
773
 
1019
774
    def test_walkdirs(self):
1020
775
        tree = [
1053
808
            result.append((dirdetail, dirblock))
1054
809
 
1055
810
        self.assertTrue(found_bzrdir)
1056
 
        self.assertExpectedBlocks(expected_dirblocks, result)
 
811
        self.assertEqual(expected_dirblocks,
 
812
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1057
813
        # you can search a subdir only, with a supplied prefix.
1058
814
        result = []
1059
815
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
1060
816
            result.append(dirblock)
1061
 
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1062
 
 
1063
 
    def test_walkdirs_os_error(self):
1064
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1065
 
        # Pyrex readdir didn't raise useful messages if it had an error
1066
 
        # reading the directory
1067
 
        if sys.platform == 'win32':
1068
 
            raise tests.TestNotApplicable(
1069
 
                "readdir IOError not tested on win32")
1070
 
        os.mkdir("test-unreadable")
1071
 
        os.chmod("test-unreadable", 0000)
1072
 
        # must chmod it back so that it can be removed
1073
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
1074
 
        # The error is not raised until the generator is actually evaluated.
1075
 
        # (It would be ok if it happened earlier but at the moment it
1076
 
        # doesn't.)
1077
 
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1078
 
        self.assertEquals('./test-unreadable', e.filename)
1079
 
        self.assertEquals(errno.EACCES, e.errno)
1080
 
        # Ensure the message contains the file name
1081
 
        self.assertContainsRe(str(e), "\./test-unreadable")
 
817
        self.assertEqual(expected_dirblocks[1:],
 
818
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1082
819
 
1083
820
    def test__walkdirs_utf8(self):
1084
821
        tree = [
1117
854
            result.append((dirdetail, dirblock))
1118
855
 
1119
856
        self.assertTrue(found_bzrdir)
1120
 
        self.assertExpectedBlocks(expected_dirblocks, result)
1121
 
 
 
857
        self.assertEqual(expected_dirblocks,
 
858
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1122
859
        # you can search a subdir only, with a supplied prefix.
1123
860
        result = []
1124
861
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
1125
862
            result.append(dirblock)
1126
 
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
863
        self.assertEqual(expected_dirblocks[1:],
 
864
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1127
865
 
1128
866
    def _filter_out_stat(self, result):
1129
867
        """Filter out the stat value from the walkdirs result"""
1135
873
            dirblock[:] = new_dirblock
1136
874
 
1137
875
    def _save_platform_info(self):
1138
 
        self.overrideAttr(win32utils, 'winver')
1139
 
        self.overrideAttr(osutils, '_fs_enc')
1140
 
        self.overrideAttr(osutils, '_selected_dir_reader')
 
876
        cur_winver = win32utils.winver
 
877
        cur_fs_enc = osutils._fs_enc
 
878
        cur_dir_reader = osutils._selected_dir_reader
 
879
        def restore():
 
880
            win32utils.winver = cur_winver
 
881
            osutils._fs_enc = cur_fs_enc
 
882
            osutils._selected_dir_reader = cur_dir_reader
 
883
        self.addCleanup(restore)
1141
884
 
1142
 
    def assertDirReaderIs(self, expected):
 
885
    def assertReadFSDirIs(self, expected):
1143
886
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1144
887
        # Force it to redetect
1145
888
        osutils._selected_dir_reader = None
1152
895
        self._save_platform_info()
1153
896
        win32utils.winver = None # Avoid the win32 detection code
1154
897
        osutils._fs_enc = 'UTF-8'
1155
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
898
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
1156
899
 
1157
900
    def test_force_walkdirs_utf8_fs_ascii(self):
1158
901
        self.requireFeature(UTF8DirReaderFeature)
1159
902
        self._save_platform_info()
1160
903
        win32utils.winver = None # Avoid the win32 detection code
1161
904
        osutils._fs_enc = 'US-ASCII'
1162
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
905
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
1163
906
 
1164
907
    def test_force_walkdirs_utf8_fs_ANSI(self):
1165
908
        self.requireFeature(UTF8DirReaderFeature)
1166
909
        self._save_platform_info()
1167
910
        win32utils.winver = None # Avoid the win32 detection code
1168
911
        osutils._fs_enc = 'ANSI_X3.4-1968'
1169
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
912
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
1170
913
 
1171
914
    def test_force_walkdirs_utf8_fs_latin1(self):
1172
915
        self._save_platform_info()
1173
916
        win32utils.winver = None # Avoid the win32 detection code
1174
917
        osutils._fs_enc = 'latin1'
1175
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
918
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
1176
919
 
1177
920
    def test_force_walkdirs_utf8_nt(self):
1178
921
        # Disabled because the thunk of the whole walkdirs api is disabled.
1179
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
922
        self.requireFeature(Win32ReadDirFeature)
1180
923
        self._save_platform_info()
1181
924
        win32utils.winver = 'Windows NT'
1182
925
        from bzrlib._walkdirs_win32 import Win32ReadDir
1183
 
        self.assertDirReaderIs(Win32ReadDir)
 
926
        self.assertReadFSDirIs(Win32ReadDir)
1184
927
 
1185
928
    def test_force_walkdirs_utf8_98(self):
1186
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
929
        self.requireFeature(Win32ReadDirFeature)
1187
930
        self._save_platform_info()
1188
931
        win32utils.winver = 'Windows 98'
1189
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
932
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
1190
933
 
1191
934
    def test_unicode_walkdirs(self):
1192
935
        """Walkdirs should always return unicode paths."""
1193
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1194
936
        name0 = u'0file-\xb6'
1195
937
        name1 = u'1dir-\u062c\u0648'
1196
938
        name2 = u'2file-\u0633'
1201
943
            name1 + '/' + name1 + '/',
1202
944
            name2,
1203
945
            ]
1204
 
        self.build_tree(tree)
 
946
        try:
 
947
            self.build_tree(tree)
 
948
        except UnicodeError:
 
949
            raise TestSkipped('Could not represent Unicode chars'
 
950
                              ' in current encoding.')
1205
951
        expected_dirblocks = [
1206
952
                ((u'', u'.'),
1207
953
                 [(name0, name0, 'file', './' + name0),
1233
979
 
1234
980
        The abspath portion might be in unicode or utf-8
1235
981
        """
1236
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1237
982
        name0 = u'0file-\xb6'
1238
983
        name1 = u'1dir-\u062c\u0648'
1239
984
        name2 = u'2file-\u0633'
1244
989
            name1 + '/' + name1 + '/',
1245
990
            name2,
1246
991
            ]
1247
 
        self.build_tree(tree)
 
992
        try:
 
993
            self.build_tree(tree)
 
994
        except UnicodeError:
 
995
            raise TestSkipped('Could not represent Unicode chars'
 
996
                              ' in current encoding.')
1248
997
        name0 = name0.encode('utf8')
1249
998
        name1 = name1.encode('utf8')
1250
999
        name2 = name2.encode('utf8')
1294
1043
 
1295
1044
        The abspath portion should be in unicode
1296
1045
        """
1297
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1298
1046
        # Use the unicode reader. TODO: split into driver-and-driven unit
1299
1047
        # tests.
1300
1048
        self._save_platform_info()
1309
1057
            name1u + '/' + name1u + '/',
1310
1058
            name2u,
1311
1059
            ]
1312
 
        self.build_tree(tree)
 
1060
        try:
 
1061
            self.build_tree(tree)
 
1062
        except UnicodeError:
 
1063
            raise TestSkipped('Could not represent Unicode chars'
 
1064
                              ' in current encoding.')
1313
1065
        name0 = name0u.encode('utf8')
1314
1066
        name1 = name1u.encode('utf8')
1315
1067
        name2 = name2u.encode('utf8')
1340
1092
        self.assertEqual(expected_dirblocks, result)
1341
1093
 
1342
1094
    def test__walkdirs_utf8_win32readdir(self):
1343
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1095
        self.requireFeature(Win32ReadDirFeature)
1344
1096
        self.requireFeature(tests.UnicodeFilenameFeature)
1345
1097
        from bzrlib._walkdirs_win32 import Win32ReadDir
1346
1098
        self._save_platform_info()
1397
1149
 
1398
1150
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1399
1151
        """make sure our Stat values are valid"""
1400
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1152
        self.requireFeature(Win32ReadDirFeature)
1401
1153
        self.requireFeature(tests.UnicodeFilenameFeature)
1402
1154
        from bzrlib._walkdirs_win32 import Win32ReadDir
1403
1155
        name0u = u'0file-\xb6'
1421
1173
 
1422
1174
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1423
1175
        """make sure our Stat values are valid"""
1424
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1176
        self.requireFeature(Win32ReadDirFeature)
1425
1177
        self.requireFeature(tests.UnicodeFilenameFeature)
1426
1178
        from bzrlib._walkdirs_win32 import Win32ReadDir
1427
1179
        name0u = u'0dir-\u062c\u0648'
1512
1264
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1513
1265
 
1514
1266
 
1515
 
class TestCopyTree(tests.TestCaseInTempDir):
1516
 
 
 
1267
class TestCopyTree(TestCaseInTempDir):
 
1268
    
1517
1269
    def test_copy_basic_tree(self):
1518
1270
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1519
1271
        osutils.copy_tree('source', 'target')
1528
1280
        self.assertEqual(['c'], os.listdir('target/b'))
1529
1281
 
1530
1282
    def test_copy_tree_symlinks(self):
1531
 
        self.requireFeature(tests.SymlinkFeature)
 
1283
        self.requireFeature(SymlinkFeature)
1532
1284
        self.build_tree(['source/'])
1533
1285
        os.symlink('a/generic/path', 'source/lnk')
1534
1286
        osutils.copy_tree('source', 'target')
1564
1316
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1565
1317
 
1566
1318
 
1567
 
class TestSetUnsetEnv(tests.TestCase):
 
1319
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
 
1320
# [bialix] 2006/12/26
 
1321
 
 
1322
 
 
1323
class TestSetUnsetEnv(TestCase):
1568
1324
    """Test updating the environment"""
1569
1325
 
1570
1326
    def setUp(self):
1576
1332
        def cleanup():
1577
1333
            if 'BZR_TEST_ENV_VAR' in os.environ:
1578
1334
                del os.environ['BZR_TEST_ENV_VAR']
 
1335
 
1579
1336
        self.addCleanup(cleanup)
1580
1337
 
1581
1338
    def test_set(self):
1593
1350
 
1594
1351
    def test_unicode(self):
1595
1352
        """Environment can only contain plain strings
1596
 
 
 
1353
        
1597
1354
        So Unicode strings must be encoded.
1598
1355
        """
1599
 
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
1356
        uni_val, env_val = probe_unicode_in_user_encoding()
1600
1357
        if uni_val is None:
1601
 
            raise tests.TestSkipped(
1602
 
                'Cannot find a unicode character that works in encoding %s'
1603
 
                % (osutils.get_user_encoding(),))
 
1358
            raise TestSkipped('Cannot find a unicode character that works in'
 
1359
                              ' encoding %s' % (osutils.get_user_encoding(),))
1604
1360
 
1605
1361
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1606
1362
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1614
1370
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1615
1371
 
1616
1372
 
1617
 
class TestSizeShaFile(tests.TestCaseInTempDir):
1618
 
 
1619
 
    def test_sha_empty(self):
1620
 
        self.build_tree_contents([('foo', '')])
1621
 
        expected_sha = osutils.sha_string('')
1622
 
        f = open('foo')
1623
 
        self.addCleanup(f.close)
1624
 
        size, sha = osutils.size_sha_file(f)
1625
 
        self.assertEqual(0, size)
1626
 
        self.assertEqual(expected_sha, sha)
1627
 
 
1628
 
    def test_sha_mixed_endings(self):
1629
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1630
 
        self.build_tree_contents([('foo', text)])
1631
 
        expected_sha = osutils.sha_string(text)
1632
 
        f = open('foo', 'rb')
1633
 
        self.addCleanup(f.close)
1634
 
        size, sha = osutils.size_sha_file(f)
1635
 
        self.assertEqual(38, size)
1636
 
        self.assertEqual(expected_sha, sha)
1637
 
 
1638
 
 
1639
 
class TestShaFileByName(tests.TestCaseInTempDir):
1640
 
 
1641
 
    def test_sha_empty(self):
1642
 
        self.build_tree_contents([('foo', '')])
1643
 
        expected_sha = osutils.sha_string('')
1644
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1645
 
 
1646
 
    def test_sha_mixed_endings(self):
1647
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1648
 
        self.build_tree_contents([('foo', text)])
1649
 
        expected_sha = osutils.sha_string(text)
1650
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1651
 
 
1652
 
 
1653
 
class TestResourceLoading(tests.TestCaseInTempDir):
 
1373
class TestLocalTimeOffset(TestCase):
 
1374
 
 
1375
    def test_local_time_offset(self):
 
1376
        """Test that local_time_offset() returns a sane value."""
 
1377
        offset = osutils.local_time_offset()
 
1378
        self.assertTrue(isinstance(offset, int))
 
1379
        # Test that the offset is no more than a eighteen hours in
 
1380
        # either direction.
 
1381
        # Time zone handling is system specific, so it is difficult to
 
1382
        # do more specific tests, but a value outside of this range is
 
1383
        # probably wrong.
 
1384
        eighteen_hours = 18 * 3600
 
1385
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1386
 
 
1387
    def test_local_time_offset_with_timestamp(self):
 
1388
        """Test that local_time_offset() works with a timestamp."""
 
1389
        offset = osutils.local_time_offset(1000000000.1234567)
 
1390
        self.assertTrue(isinstance(offset, int))
 
1391
        eighteen_hours = 18 * 3600
 
1392
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1393
 
 
1394
 
 
1395
class TestShaFileByName(TestCaseInTempDir):
 
1396
 
 
1397
    def test_sha_empty(self):
 
1398
        self.build_tree_contents([('foo', '')])
 
1399
        expected_sha = osutils.sha_string('')
 
1400
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1401
 
 
1402
    def test_sha_mixed_endings(self):
 
1403
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1404
        self.build_tree_contents([('foo', text)])
 
1405
        expected_sha = osutils.sha_string(text)
 
1406
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1407
 
 
1408
 
 
1409
_debug_text = \
 
1410
r'''# Copyright (C) 2005, 2006 Canonical Ltd
 
1411
#
 
1412
# This program is free software; you can redistribute it and/or modify
 
1413
# it under the terms of the GNU General Public License as published by
 
1414
# the Free Software Foundation; either version 2 of the License, or
 
1415
# (at your option) any later version.
 
1416
#
 
1417
# This program is distributed in the hope that it will be useful,
 
1418
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
1419
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
1420
# GNU General Public License for more details.
 
1421
#
 
1422
# You should have received a copy of the GNU General Public License
 
1423
# along with this program; if not, write to the Free Software
 
1424
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
1425
 
 
1426
 
 
1427
# NOTE: If update these, please also update the help for global-options in
 
1428
#       bzrlib/help_topics/__init__.py
 
1429
 
 
1430
debug_flags = set()
 
1431
"""Set of flags that enable different debug behaviour.
 
1432
 
 
1433
These are set with eg ``-Dlock`` on the bzr command line.
 
1434
 
 
1435
Options include:
 
1436
 
 
1437
 * auth - show authentication sections used
 
1438
 * error - show stack traces for all top level exceptions
 
1439
 * evil - capture call sites that do expensive or badly-scaling operations.
 
1440
 * fetch - trace history copying between repositories
 
1441
 * graph - trace graph traversal information
 
1442
 * hashcache - log every time a working file is read to determine its hash
 
1443
 * hooks - trace hook execution
 
1444
 * hpss - trace smart protocol requests and responses
 
1445
 * http - trace http connections, requests and responses
 
1446
 * index - trace major index operations
 
1447
 * knit - trace knit operations
 
1448
 * lock - trace when lockdir locks are taken or released
 
1449
 * merge - emit information for debugging merges
 
1450
 * pack - emit information about pack operations
 
1451
 
 
1452
"""
 
1453
'''
 
1454
 
 
1455
 
 
1456
class TestResourceLoading(TestCaseInTempDir):
1654
1457
 
1655
1458
    def test_resource_string(self):
1656
1459
        # test resource in bzrlib
1657
1460
        text = osutils.resource_string('bzrlib', 'debug.py')
1658
 
        self.assertContainsRe(text, "debug_flags = set()")
 
1461
        self.assertEquals(_debug_text, text)
1659
1462
        # test resource under bzrlib
1660
1463
        text = osutils.resource_string('bzrlib.ui', 'text.py')
1661
1464
        self.assertContainsRe(text, "class TextUIFactory")
1664
1467
            'yyy.xx')
1665
1468
        # test unknown resource
1666
1469
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1667
 
 
1668
 
 
1669
 
class TestReCompile(tests.TestCase):
1670
 
 
1671
 
    def test_re_compile_checked(self):
1672
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1673
 
        self.assertTrue(r.match('aaaa'))
1674
 
        self.assertTrue(r.match('aAaA'))
1675
 
 
1676
 
    def test_re_compile_checked_error(self):
1677
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1678
 
        err = self.assertRaises(
1679
 
            errors.BzrCommandError,
1680
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1681
 
        self.assertEqual(
1682
 
            "Invalid regular expression in test case: '*': "
1683
 
            "nothing to repeat",
1684
 
            str(err))
1685
 
 
1686
 
 
1687
 
class TestDirReader(tests.TestCaseInTempDir):
1688
 
 
1689
 
    # Set by load_tests
1690
 
    _dir_reader_class = None
1691
 
    _native_to_unicode = None
1692
 
 
1693
 
    def setUp(self):
1694
 
        tests.TestCaseInTempDir.setUp(self)
1695
 
        self.overrideAttr(osutils,
1696
 
                          '_selected_dir_reader', self._dir_reader_class())
1697
 
 
1698
 
    def _get_ascii_tree(self):
1699
 
        tree = [
1700
 
            '0file',
1701
 
            '1dir/',
1702
 
            '1dir/0file',
1703
 
            '1dir/1dir/',
1704
 
            '2file'
1705
 
            ]
1706
 
        expected_dirblocks = [
1707
 
                (('', '.'),
1708
 
                 [('0file', '0file', 'file'),
1709
 
                  ('1dir', '1dir', 'directory'),
1710
 
                  ('2file', '2file', 'file'),
1711
 
                 ]
1712
 
                ),
1713
 
                (('1dir', './1dir'),
1714
 
                 [('1dir/0file', '0file', 'file'),
1715
 
                  ('1dir/1dir', '1dir', 'directory'),
1716
 
                 ]
1717
 
                ),
1718
 
                (('1dir/1dir', './1dir/1dir'),
1719
 
                 [
1720
 
                 ]
1721
 
                ),
1722
 
            ]
1723
 
        return tree, expected_dirblocks
1724
 
 
1725
 
    def test_walk_cur_dir(self):
1726
 
        tree, expected_dirblocks = self._get_ascii_tree()
1727
 
        self.build_tree(tree)
1728
 
        result = list(osutils._walkdirs_utf8('.'))
1729
 
        # Filter out stat and abspath
1730
 
        self.assertEqual(expected_dirblocks,
1731
 
                         [(dirinfo, [line[0:3] for line in block])
1732
 
                          for dirinfo, block in result])
1733
 
 
1734
 
    def test_walk_sub_dir(self):
1735
 
        tree, expected_dirblocks = self._get_ascii_tree()
1736
 
        self.build_tree(tree)
1737
 
        # you can search a subdir only, with a supplied prefix.
1738
 
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1739
 
        # Filter out stat and abspath
1740
 
        self.assertEqual(expected_dirblocks[1:],
1741
 
                         [(dirinfo, [line[0:3] for line in block])
1742
 
                          for dirinfo, block in result])
1743
 
 
1744
 
    def _get_unicode_tree(self):
1745
 
        name0u = u'0file-\xb6'
1746
 
        name1u = u'1dir-\u062c\u0648'
1747
 
        name2u = u'2file-\u0633'
1748
 
        tree = [
1749
 
            name0u,
1750
 
            name1u + '/',
1751
 
            name1u + '/' + name0u,
1752
 
            name1u + '/' + name1u + '/',
1753
 
            name2u,
1754
 
            ]
1755
 
        name0 = name0u.encode('UTF-8')
1756
 
        name1 = name1u.encode('UTF-8')
1757
 
        name2 = name2u.encode('UTF-8')
1758
 
        expected_dirblocks = [
1759
 
                (('', '.'),
1760
 
                 [(name0, name0, 'file', './' + name0u),
1761
 
                  (name1, name1, 'directory', './' + name1u),
1762
 
                  (name2, name2, 'file', './' + name2u),
1763
 
                 ]
1764
 
                ),
1765
 
                ((name1, './' + name1u),
1766
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1767
 
                                                        + '/' + name0u),
1768
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1769
 
                                                            + '/' + name1u),
1770
 
                 ]
1771
 
                ),
1772
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1773
 
                 [
1774
 
                 ]
1775
 
                ),
1776
 
            ]
1777
 
        return tree, expected_dirblocks
1778
 
 
1779
 
    def _filter_out(self, raw_dirblocks):
1780
 
        """Filter out a walkdirs_utf8 result.
1781
 
 
1782
 
        stat field is removed, all native paths are converted to unicode
1783
 
        """
1784
 
        filtered_dirblocks = []
1785
 
        for dirinfo, block in raw_dirblocks:
1786
 
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1787
 
            details = []
1788
 
            for line in block:
1789
 
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1790
 
            filtered_dirblocks.append((dirinfo, details))
1791
 
        return filtered_dirblocks
1792
 
 
1793
 
    def test_walk_unicode_tree(self):
1794
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1795
 
        tree, expected_dirblocks = self._get_unicode_tree()
1796
 
        self.build_tree(tree)
1797
 
        result = list(osutils._walkdirs_utf8('.'))
1798
 
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1799
 
 
1800
 
    def test_symlink(self):
1801
 
        self.requireFeature(tests.SymlinkFeature)
1802
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1803
 
        target = u'target\N{Euro Sign}'
1804
 
        link_name = u'l\N{Euro Sign}nk'
1805
 
        os.symlink(target, link_name)
1806
 
        target_utf8 = target.encode('UTF-8')
1807
 
        link_name_utf8 = link_name.encode('UTF-8')
1808
 
        expected_dirblocks = [
1809
 
                (('', '.'),
1810
 
                 [(link_name_utf8, link_name_utf8,
1811
 
                   'symlink', './' + link_name),],
1812
 
                 )]
1813
 
        result = list(osutils._walkdirs_utf8('.'))
1814
 
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1815
 
 
1816
 
 
1817
 
class TestReadLink(tests.TestCaseInTempDir):
1818
 
    """Exposes os.readlink() problems and the osutils solution.
1819
 
 
1820
 
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
1821
 
    unicode string will be returned if a unicode string is passed.
1822
 
 
1823
 
    But prior python versions failed to properly encode the passed unicode
1824
 
    string.
1825
 
    """
1826
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1827
 
 
1828
 
    def setUp(self):
1829
 
        super(tests.TestCaseInTempDir, self).setUp()
1830
 
        self.link = u'l\N{Euro Sign}ink'
1831
 
        self.target = u'targe\N{Euro Sign}t'
1832
 
        os.symlink(self.target, self.link)
1833
 
 
1834
 
    def test_os_readlink_link_encoding(self):
1835
 
        if sys.version_info < (2, 6):
1836
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1837
 
        else:
1838
 
            self.assertEquals(self.target,  os.readlink(self.link))
1839
 
 
1840
 
    def test_os_readlink_link_decoding(self):
1841
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
1842
 
                          os.readlink(self.link.encode(osutils._fs_enc)))
1843
 
 
1844
 
 
1845
 
class TestConcurrency(tests.TestCase):
1846
 
 
1847
 
    def setUp(self):
1848
 
        super(TestConcurrency, self).setUp()
1849
 
        self.overrideAttr(osutils, '_cached_local_concurrency')
1850
 
 
1851
 
    def test_local_concurrency(self):
1852
 
        concurrency = osutils.local_concurrency()
1853
 
        self.assertIsInstance(concurrency, int)
1854
 
 
1855
 
    def test_local_concurrency_environment_variable(self):
1856
 
        os.environ['BZR_CONCURRENCY'] = '2'
1857
 
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1858
 
        os.environ['BZR_CONCURRENCY'] = '3'
1859
 
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1860
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
1861
 
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1862
 
 
1863
 
    def test_option_concurrency(self):
1864
 
        os.environ['BZR_CONCURRENCY'] = '1'
1865
 
        self.run_bzr('rocks --concurrency 42')
1866
 
        # Command line overrides envrionment variable
1867
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1868
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1869
 
 
1870
 
 
1871
 
class TestFailedToLoadExtension(tests.TestCase):
1872
 
 
1873
 
    def _try_loading(self):
1874
 
        try:
1875
 
            import bzrlib._fictional_extension_py
1876
 
        except ImportError, e:
1877
 
            osutils.failed_to_load_extension(e)
1878
 
            return True
1879
 
 
1880
 
    def setUp(self):
1881
 
        super(TestFailedToLoadExtension, self).setUp()
1882
 
        self.overrideAttr(osutils, '_extension_load_failures', [])
1883
 
 
1884
 
    def test_failure_to_load(self):
1885
 
        self._try_loading()
1886
 
        self.assertLength(1, osutils._extension_load_failures)
1887
 
        self.assertEquals(osutils._extension_load_failures[0],
1888
 
            "No module named _fictional_extension_py")
1889
 
 
1890
 
    def test_report_extension_load_failures_no_warning(self):
1891
 
        self.assertTrue(self._try_loading())
1892
 
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1893
 
        # it used to give a Python warning; it no longer does
1894
 
        self.assertLength(0, warnings)
1895
 
 
1896
 
    def test_report_extension_load_failures_message(self):
1897
 
        log = StringIO()
1898
 
        trace.push_log_file(log)
1899
 
        self.assertTrue(self._try_loading())
1900
 
        osutils.report_extension_load_failures()
1901
 
        self.assertContainsRe(
1902
 
            log.getvalue(),
1903
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1904
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1905
 
            )
1906
 
 
1907
 
 
1908
 
class TestTerminalWidth(tests.TestCase):
1909
 
 
1910
 
    def replace_stdout(self, new):
1911
 
        self.overrideAttr(sys, 'stdout', new)
1912
 
 
1913
 
    def replace__terminal_size(self, new):
1914
 
        self.overrideAttr(osutils, '_terminal_size', new)
1915
 
 
1916
 
    def set_fake_tty(self):
1917
 
 
1918
 
        class I_am_a_tty(object):
1919
 
            def isatty(self):
1920
 
                return True
1921
 
 
1922
 
        self.replace_stdout(I_am_a_tty())
1923
 
 
1924
 
    def test_default_values(self):
1925
 
        self.assertEqual(80, osutils.default_terminal_width)
1926
 
 
1927
 
    def test_defaults_to_BZR_COLUMNS(self):
1928
 
        # BZR_COLUMNS is set by the test framework
1929
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1930
 
        os.environ['BZR_COLUMNS'] = '12'
1931
 
        self.assertEqual(12, osutils.terminal_width())
1932
 
 
1933
 
    def test_falls_back_to_COLUMNS(self):
1934
 
        del os.environ['BZR_COLUMNS']
1935
 
        self.assertNotEqual('42', os.environ['COLUMNS'])
1936
 
        self.set_fake_tty()
1937
 
        os.environ['COLUMNS'] = '42'
1938
 
        self.assertEqual(42, osutils.terminal_width())
1939
 
 
1940
 
    def test_tty_default_without_columns(self):
1941
 
        del os.environ['BZR_COLUMNS']
1942
 
        del os.environ['COLUMNS']
1943
 
 
1944
 
        def terminal_size(w, h):
1945
 
            return 42, 42
1946
 
 
1947
 
        self.set_fake_tty()
1948
 
        # We need to override the osutils definition as it depends on the
1949
 
        # running environment that we can't control (PQM running without a
1950
 
        # controlling terminal is one example).
1951
 
        self.replace__terminal_size(terminal_size)
1952
 
        self.assertEqual(42, osutils.terminal_width())
1953
 
 
1954
 
    def test_non_tty_default_without_columns(self):
1955
 
        del os.environ['BZR_COLUMNS']
1956
 
        del os.environ['COLUMNS']
1957
 
        self.replace_stdout(None)
1958
 
        self.assertEqual(None, osutils.terminal_width())
1959
 
 
1960
 
    def test_no_TIOCGWINSZ(self):
1961
 
        self.requireFeature(term_ios_feature)
1962
 
        termios = term_ios_feature.module
1963
 
        # bug 63539 is about a termios without TIOCGWINSZ attribute
1964
 
        try:
1965
 
            orig = termios.TIOCGWINSZ
1966
 
        except AttributeError:
1967
 
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
1968
 
            pass
1969
 
        else:
1970
 
            self.overrideAttr(termios, 'TIOCGWINSZ')
1971
 
            del termios.TIOCGWINSZ
1972
 
        del os.environ['BZR_COLUMNS']
1973
 
        del os.environ['COLUMNS']
1974
 
        # Whatever the result is, if we don't raise an exception, it's ok.
1975
 
        osutils.terminal_width()