~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Mark Hammond
  • Date: 2009-01-12 01:55:34 UTC
  • mto: (3995.8.2 prepare-1.12)
  • mto: This revision was merged to the branch mainline in revision 4007.
  • Revision ID: mhammond@skippinet.com.au-20090112015534-yfxg50p7mpds9j4v
Include all .html files from the tortoise doc directory.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
19
19
from cStringIO import StringIO
20
20
import errno
21
21
import os
22
 
import re
23
22
import socket
24
23
import stat
25
24
import sys
29
28
    errors,
30
29
    osutils,
31
30
    tests,
32
 
    trace,
33
31
    win32utils,
34
32
    )
 
33
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
 
34
from bzrlib.osutils import (
 
35
        is_inside_any,
 
36
        is_inside_or_parent_of_any,
 
37
        pathjoin,
 
38
        pumpfile,
 
39
        pump_string_file,
 
40
        canonical_relpath,
 
41
        )
35
42
from bzrlib.tests import (
36
 
    file_utils,
37
 
    test__walkdirs_win32,
 
43
        adapt_tests,
 
44
        Feature,
 
45
        probe_unicode_in_user_encoding,
 
46
        split_suite_by_re,
 
47
        StringIOWrapper,
 
48
        SymlinkFeature,
 
49
        CaseInsCasePresFilenameFeature,
 
50
        TestCase,
 
51
        TestCaseInTempDir,
 
52
        TestScenarioApplier,
 
53
        TestSkipped,
 
54
        )
 
55
from bzrlib.tests.file_utils import (
 
56
    FakeReadFile,
38
57
    )
39
 
 
40
 
 
41
 
class _UTF8DirReaderFeature(tests.Feature):
 
58
from bzrlib.tests.test__walkdirs_win32 import Win32ReadDirFeature
 
59
 
 
60
 
 
61
class _UTF8DirReaderFeature(Feature):
42
62
 
43
63
    def _probe(self):
44
64
        try:
53
73
 
54
74
UTF8DirReaderFeature = _UTF8DirReaderFeature()
55
75
 
56
 
term_ios_feature = tests.ModuleAvailableFeature('termios')
57
 
 
58
 
 
59
 
def _already_unicode(s):
60
 
    return s
61
 
 
62
 
 
63
 
def _utf8_to_unicode(s):
64
 
    return s.decode('UTF-8')
65
 
 
66
 
 
67
 
def dir_reader_scenarios():
68
 
    # For each dir reader we define:
69
 
 
70
 
    # - native_to_unicode: a function converting the native_abspath as returned
71
 
    #   by DirReader.read_dir to its unicode representation
72
 
 
73
 
    # UnicodeDirReader is the fallback, it should be tested on all platforms.
74
 
    scenarios = [('unicode',
75
 
                  dict(_dir_reader_class=osutils.UnicodeDirReader,
76
 
                       _native_to_unicode=_already_unicode))]
77
 
    # Some DirReaders are platform specific and even there they may not be
78
 
    # available.
79
 
    if UTF8DirReaderFeature.available():
80
 
        from bzrlib import _readdir_pyx
81
 
        scenarios.append(('utf8',
82
 
                          dict(_dir_reader_class=_readdir_pyx.UTF8DirReader,
83
 
                               _native_to_unicode=_utf8_to_unicode)))
84
 
 
85
 
    if test__walkdirs_win32.win32_readdir_feature.available():
86
 
        try:
87
 
            from bzrlib import _walkdirs_win32
88
 
            scenarios.append(
89
 
                ('win32',
90
 
                 dict(_dir_reader_class=_walkdirs_win32.Win32ReadDir,
91
 
                      _native_to_unicode=_already_unicode)))
92
 
        except ImportError:
93
 
            pass
94
 
    return scenarios
95
 
 
96
 
 
97
 
def load_tests(basic_tests, module, loader):
98
 
    suite = loader.suiteClass()
99
 
    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
100
 
        basic_tests, tests.condition_isinstance(TestDirReader))
101
 
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios(), suite)
102
 
    suite.addTest(remaining_tests)
103
 
    return suite
104
 
 
105
 
 
106
 
class TestContainsWhitespace(tests.TestCase):
 
76
 
 
77
class TestOSUtils(TestCaseInTempDir):
107
78
 
108
79
    def test_contains_whitespace(self):
109
80
        self.failUnless(osutils.contains_whitespace(u' '))
119
90
        self.failIf(osutils.contains_whitespace(u'hellothere'))
120
91
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
121
92
 
122
 
 
123
 
class TestRename(tests.TestCaseInTempDir):
124
 
 
125
 
    def create_file(self, filename, content):
126
 
        f = open(filename, 'wb')
127
 
        try:
128
 
            f.write(content)
129
 
        finally:
130
 
            f.close()
131
 
 
132
 
    def _fancy_rename(self, a, b):
133
 
        osutils.fancy_rename(a, b, rename_func=os.rename,
134
 
                             unlink_func=os.unlink)
135
 
 
136
93
    def test_fancy_rename(self):
137
94
        # This should work everywhere
138
 
        self.create_file('a', 'something in a\n')
139
 
        self._fancy_rename('a', 'b')
 
95
        def rename(a, b):
 
96
            osutils.fancy_rename(a, b,
 
97
                    rename_func=os.rename,
 
98
                    unlink_func=os.unlink)
 
99
 
 
100
        open('a', 'wb').write('something in a\n')
 
101
        rename('a', 'b')
140
102
        self.failIfExists('a')
141
103
        self.failUnlessExists('b')
142
104
        self.check_file_contents('b', 'something in a\n')
143
105
 
144
 
        self.create_file('a', 'new something in a\n')
145
 
        self._fancy_rename('b', 'a')
 
106
        open('a', 'wb').write('new something in a\n')
 
107
        rename('b', 'a')
146
108
 
147
109
        self.check_file_contents('a', 'something in a\n')
148
110
 
149
 
    def test_fancy_rename_fails_source_missing(self):
150
 
        # An exception should be raised, and the target should be left in place
151
 
        self.create_file('target', 'data in target\n')
152
 
        self.assertRaises((IOError, OSError), self._fancy_rename,
153
 
                          'missingsource', 'target')
154
 
        self.failUnlessExists('target')
155
 
        self.check_file_contents('target', 'data in target\n')
156
 
 
157
 
    def test_fancy_rename_fails_if_source_and_target_missing(self):
158
 
        self.assertRaises((IOError, OSError), self._fancy_rename,
159
 
                          'missingsource', 'missingtarget')
160
 
 
161
111
    def test_rename(self):
162
112
        # Rename should be semi-atomic on all platforms
163
 
        self.create_file('a', 'something in a\n')
 
113
        open('a', 'wb').write('something in a\n')
164
114
        osutils.rename('a', 'b')
165
115
        self.failIfExists('a')
166
116
        self.failUnlessExists('b')
167
117
        self.check_file_contents('b', 'something in a\n')
168
118
 
169
 
        self.create_file('a', 'new something in a\n')
 
119
        open('a', 'wb').write('new something in a\n')
170
120
        osutils.rename('b', 'a')
171
121
 
172
122
        self.check_file_contents('a', 'something in a\n')
183
133
        shape = sorted(os.listdir('.'))
184
134
        self.assertEquals(['A', 'B'], shape)
185
135
 
186
 
 
187
 
class TestRandChars(tests.TestCase):
188
 
 
189
136
    def test_01_rand_chars_empty(self):
190
137
        result = osutils.rand_chars(0)
191
138
        self.assertEqual(result, '')
196
143
        self.assertEqual(type(result), str)
197
144
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
198
145
 
199
 
 
200
 
class TestIsInside(tests.TestCase):
201
 
 
202
146
    def test_is_inside(self):
203
147
        is_inside = osutils.is_inside
204
148
        self.assertTrue(is_inside('src', 'src/foo.c'))
209
153
        self.assertTrue(is_inside('', 'foo.c'))
210
154
 
211
155
    def test_is_inside_any(self):
212
 
        SRC_FOO_C = osutils.pathjoin('src', 'foo.c')
 
156
        SRC_FOO_C = pathjoin('src', 'foo.c')
213
157
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
214
158
                         (['src'], SRC_FOO_C),
215
159
                         (['src'], 'src'),
216
160
                         ]:
217
 
            self.assert_(osutils.is_inside_any(dirs, fn))
 
161
            self.assert_(is_inside_any(dirs, fn))
218
162
        for dirs, fn in [(['src'], 'srccontrol'),
219
163
                         (['src'], 'srccontrol/foo')]:
220
 
            self.assertFalse(osutils.is_inside_any(dirs, fn))
 
164
            self.assertFalse(is_inside_any(dirs, fn))
221
165
 
222
166
    def test_is_inside_or_parent_of_any(self):
223
167
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
226
170
                         (['src/bar.c', 'bla/foo.c'], 'src'),
227
171
                         (['src'], 'src'),
228
172
                         ]:
229
 
            self.assert_(osutils.is_inside_or_parent_of_any(dirs, fn))
230
 
 
 
173
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
174
            
231
175
        for dirs, fn in [(['src'], 'srccontrol'),
232
176
                         (['srccontrol/foo.c'], 'src'),
233
177
                         (['src'], 'srccontrol/foo')]:
234
 
            self.assertFalse(osutils.is_inside_or_parent_of_any(dirs, fn))
235
 
 
236
 
 
237
 
class TestRmTree(tests.TestCaseInTempDir):
 
178
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
238
179
 
239
180
    def test_rmtree(self):
240
181
        # Check to remove tree with read-only files/dirs
254
195
        self.failIfExists('dir/file')
255
196
        self.failIfExists('dir')
256
197
 
257
 
 
258
 
class TestDeleteAny(tests.TestCaseInTempDir):
259
 
 
260
 
    def test_delete_any_readonly(self):
261
 
        # from <https://bugs.launchpad.net/bzr/+bug/218206>
262
 
        self.build_tree(['d/', 'f'])
263
 
        osutils.make_readonly('d')
264
 
        osutils.make_readonly('f')
265
 
 
266
 
        osutils.delete_any('f')
267
 
        osutils.delete_any('d')
268
 
 
269
 
 
270
 
class TestKind(tests.TestCaseInTempDir):
271
 
 
272
198
    def test_file_kind(self):
273
199
        self.build_tree(['file', 'dir/'])
274
200
        self.assertEquals('file', osutils.file_kind('file'))
276
202
        if osutils.has_symlinks():
277
203
            os.symlink('symlink', 'symlink')
278
204
            self.assertEquals('symlink', osutils.file_kind('symlink'))
279
 
 
 
205
        
280
206
        # TODO: jam 20060529 Test a block device
281
207
        try:
282
208
            os.lstat('/dev/null')
304
230
                os.remove('socket')
305
231
 
306
232
    def test_kind_marker(self):
307
 
        self.assertEqual("", osutils.kind_marker("file"))
308
 
        self.assertEqual("/", osutils.kind_marker('directory'))
309
 
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
310
 
        self.assertEqual("@", osutils.kind_marker("symlink"))
311
 
        self.assertEqual("+", osutils.kind_marker("tree-reference"))
312
 
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
313
 
 
314
 
 
315
 
class TestUmask(tests.TestCaseInTempDir):
 
233
        self.assertEqual(osutils.kind_marker('file'), '')
 
234
        self.assertEqual(osutils.kind_marker('directory'), '/')
 
235
        self.assertEqual(osutils.kind_marker('symlink'), '@')
 
236
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
316
237
 
317
238
    def test_get_umask(self):
318
239
        if sys.platform == 'win32':
321
242
            return
322
243
 
323
244
        orig_umask = osutils.get_umask()
324
 
        self.addCleanup(os.umask, orig_umask)
325
 
        os.umask(0222)
326
 
        self.assertEqual(0222, osutils.get_umask())
327
 
        os.umask(0022)
328
 
        self.assertEqual(0022, osutils.get_umask())
329
 
        os.umask(0002)
330
 
        self.assertEqual(0002, osutils.get_umask())
331
 
        os.umask(0027)
332
 
        self.assertEqual(0027, osutils.get_umask())
333
 
 
334
 
 
335
 
class TestDateTime(tests.TestCase):
 
245
        try:
 
246
            os.umask(0222)
 
247
            self.assertEqual(0222, osutils.get_umask())
 
248
            os.umask(0022)
 
249
            self.assertEqual(0022, osutils.get_umask())
 
250
            os.umask(0002)
 
251
            self.assertEqual(0002, osutils.get_umask())
 
252
            os.umask(0027)
 
253
            self.assertEqual(0027, osutils.get_umask())
 
254
        finally:
 
255
            os.umask(orig_umask)
336
256
 
337
257
    def assertFormatedDelta(self, expected, seconds):
338
258
        """Assert osutils.format_delta formats as expected"""
380
300
        # Instead blackbox.test_locale should check for localized
381
301
        # dates once they do occur in output strings.
382
302
 
383
 
    def test_format_date_with_offset_in_original_timezone(self):
384
 
        self.assertEqual("Thu 1970-01-01 00:00:00 +0000",
385
 
            osutils.format_date_with_offset_in_original_timezone(0))
386
 
        self.assertEqual("Fri 1970-01-02 03:46:40 +0000",
387
 
            osutils.format_date_with_offset_in_original_timezone(100000))
388
 
        self.assertEqual("Fri 1970-01-02 05:46:40 +0200",
389
 
            osutils.format_date_with_offset_in_original_timezone(100000, 7200))
390
 
 
391
 
    def test_local_time_offset(self):
392
 
        """Test that local_time_offset() returns a sane value."""
393
 
        offset = osutils.local_time_offset()
394
 
        self.assertTrue(isinstance(offset, int))
395
 
        # Test that the offset is no more than a eighteen hours in
396
 
        # either direction.
397
 
        # Time zone handling is system specific, so it is difficult to
398
 
        # do more specific tests, but a value outside of this range is
399
 
        # probably wrong.
400
 
        eighteen_hours = 18 * 3600
401
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
402
 
 
403
 
    def test_local_time_offset_with_timestamp(self):
404
 
        """Test that local_time_offset() works with a timestamp."""
405
 
        offset = osutils.local_time_offset(1000000000.1234567)
406
 
        self.assertTrue(isinstance(offset, int))
407
 
        eighteen_hours = 18 * 3600
408
 
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
409
 
 
410
 
 
411
 
class TestLinks(tests.TestCaseInTempDir):
412
 
 
413
303
    def test_dereference_path(self):
414
 
        self.requireFeature(tests.SymlinkFeature)
 
304
        self.requireFeature(SymlinkFeature)
415
305
        cwd = osutils.realpath('.')
416
306
        os.mkdir('bar')
417
307
        bar_path = osutils.pathjoin(cwd, 'bar')
420
310
        self.assertEqual(bar_path, osutils.realpath('./bar'))
421
311
        os.symlink('bar', 'foo')
422
312
        self.assertEqual(bar_path, osutils.realpath('./foo'))
423
 
 
 
313
        
424
314
        # Does not dereference terminal symlinks
425
315
        foo_path = osutils.pathjoin(cwd, 'foo')
426
316
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
458
348
            osutils.make_readonly('dangling')
459
349
            osutils.make_writable('dangling')
460
350
 
 
351
    def test_kind_marker(self):
 
352
        self.assertEqual("", osutils.kind_marker("file"))
 
353
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
354
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
355
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
356
 
461
357
    def test_host_os_dereferences_symlinks(self):
462
358
        osutils.host_os_dereferences_symlinks()
463
359
 
464
360
 
465
 
class TestCanonicalRelPath(tests.TestCaseInTempDir):
 
361
class TestCanonicalRelPath(TestCaseInTempDir):
466
362
 
467
 
    _test_needs_features = [tests.CaseInsCasePresFilenameFeature]
 
363
    _test_needs_features = [CaseInsCasePresFilenameFeature]
468
364
 
469
365
    def test_canonical_relpath_simple(self):
470
366
        f = file('MixedCaseName', 'w')
471
367
        f.close()
472
 
        actual = osutils.canonical_relpath(self.test_base_dir, 'mixedcasename')
473
 
        self.failUnlessEqual('work/MixedCaseName', actual)
 
368
        self.failUnlessEqual(
 
369
            canonical_relpath(self.test_base_dir, 'mixedcasename'),
 
370
            'work/MixedCaseName')
474
371
 
475
372
    def test_canonical_relpath_missing_tail(self):
476
373
        os.mkdir('MixedCaseParent')
477
 
        actual = osutils.canonical_relpath(self.test_base_dir,
478
 
                                           'mixedcaseparent/nochild')
479
 
        self.failUnlessEqual('work/MixedCaseParent/nochild', actual)
480
 
 
481
 
 
482
 
class Test_CICPCanonicalRelpath(tests.TestCaseWithTransport):
483
 
 
484
 
    def assertRelpath(self, expected, base, path):
485
 
        actual = osutils._cicp_canonical_relpath(base, path)
486
 
        self.assertEqual(expected, actual)
487
 
 
488
 
    def test_simple(self):
489
 
        self.build_tree(['MixedCaseName'])
490
 
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
491
 
        self.assertRelpath('MixedCaseName', base, 'mixedcAsename')
492
 
 
493
 
    def test_subdir_missing_tail(self):
494
 
        self.build_tree(['MixedCaseParent/', 'MixedCaseParent/a_child'])
495
 
        base = osutils.realpath(self.get_transport('.').local_abspath('.'))
496
 
        self.assertRelpath('MixedCaseParent/a_child', base,
497
 
                           'MixedCaseParent/a_child')
498
 
        self.assertRelpath('MixedCaseParent/a_child', base,
499
 
                           'MixedCaseParent/A_Child')
500
 
        self.assertRelpath('MixedCaseParent/not_child', base,
501
 
                           'MixedCaseParent/not_child')
502
 
 
503
 
    def test_at_root_slash(self):
504
 
        # We can't test this on Windows, because it has a 'MIN_ABS_PATHLENGTH'
505
 
        # check...
506
 
        if osutils.MIN_ABS_PATHLENGTH > 1:
507
 
            raise tests.TestSkipped('relpath requires %d chars'
508
 
                                    % osutils.MIN_ABS_PATHLENGTH)
509
 
        self.assertRelpath('foo', '/', '/foo')
510
 
 
511
 
    def test_at_root_drive(self):
512
 
        if sys.platform != 'win32':
513
 
            raise tests.TestNotApplicable('we can only test drive-letter relative'
514
 
                                          ' paths on Windows where we have drive'
515
 
                                          ' letters.')
516
 
        # see bug #322807
517
 
        # The specific issue is that when at the root of a drive, 'abspath'
518
 
        # returns "C:/" or just "/". However, the code assumes that abspath
519
 
        # always returns something like "C:/foo" or "/foo" (no trailing slash).
520
 
        self.assertRelpath('foo', 'C:/', 'C:/foo')
521
 
        self.assertRelpath('foo', 'X:/', 'X:/foo')
522
 
        self.assertRelpath('foo', 'X:/', 'X://foo')
523
 
 
524
 
 
525
 
class TestPumpFile(tests.TestCase):
 
374
        self.failUnlessEqual(
 
375
            canonical_relpath(self.test_base_dir, 'mixedcaseparent/nochild'),
 
376
            'work/MixedCaseParent/nochild')
 
377
 
 
378
 
 
379
class TestPumpFile(TestCase):
526
380
    """Test pumpfile method."""
527
 
 
528
381
    def setUp(self):
529
 
        tests.TestCase.setUp(self)
530
382
        # create a test datablock
531
383
        self.block_size = 512
532
384
        pattern = '0123456789ABCDEF'
539
391
        # make sure test data is larger than max read size
540
392
        self.assertTrue(self.test_data_len > self.block_size)
541
393
 
542
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
394
        from_file = FakeReadFile(self.test_data)
543
395
        to_file = StringIO()
544
396
 
545
397
        # read (max / 2) bytes and verify read size wasn't affected
546
398
        num_bytes_to_read = self.block_size / 2
547
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
399
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
548
400
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
549
401
        self.assertEqual(from_file.get_read_count(), 1)
550
402
 
551
403
        # read (max) bytes and verify read size wasn't affected
552
404
        num_bytes_to_read = self.block_size
553
405
        from_file.reset_read_count()
554
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
406
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
555
407
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
556
408
        self.assertEqual(from_file.get_read_count(), 1)
557
409
 
558
410
        # read (max + 1) bytes and verify read size was limited
559
411
        num_bytes_to_read = self.block_size + 1
560
412
        from_file.reset_read_count()
561
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
413
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
562
414
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
563
415
        self.assertEqual(from_file.get_read_count(), 2)
564
416
 
565
417
        # finish reading the rest of the data
566
418
        num_bytes_to_read = self.test_data_len - to_file.tell()
567
 
        osutils.pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
419
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
568
420
 
569
421
        # report error if the data wasn't equal (we only report the size due
570
422
        # to the length of the data)
580
432
        self.assertTrue(self.test_data_len > self.block_size)
581
433
 
582
434
        # retrieve data in blocks
583
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
435
        from_file = FakeReadFile(self.test_data)
584
436
        to_file = StringIO()
585
 
        osutils.pumpfile(from_file, to_file, self.test_data_len,
586
 
                         self.block_size)
 
437
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
587
438
 
588
439
        # verify read size was equal to the maximum read size
589
440
        self.assertTrue(from_file.get_max_read_size() > 0)
604
455
        self.assertTrue(self.test_data_len > self.block_size)
605
456
 
606
457
        # retrieve data to EOF
607
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
458
        from_file = FakeReadFile(self.test_data)
608
459
        to_file = StringIO()
609
 
        osutils.pumpfile(from_file, to_file, -1, self.block_size)
 
460
        pumpfile(from_file, to_file, -1, self.block_size)
610
461
 
611
462
        # verify read size was equal to the maximum read size
612
463
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
624
475
        test verifies that any existing usages of pumpfile will not be broken
625
476
        with this new version."""
626
477
        # retrieve data using default (old) pumpfile method
627
 
        from_file = file_utils.FakeReadFile(self.test_data)
 
478
        from_file = FakeReadFile(self.test_data)
628
479
        to_file = StringIO()
629
 
        osutils.pumpfile(from_file, to_file)
 
480
        pumpfile(from_file, to_file)
630
481
 
631
482
        # report error if the data wasn't equal (we only report the size due
632
483
        # to the length of the data)
635
486
            message = "Data not equal.  Expected %d bytes, received %d."
636
487
            self.fail(message % (len(response_data), self.test_data_len))
637
488
 
638
 
    def test_report_activity(self):
639
 
        activity = []
640
 
        def log_activity(length, direction):
641
 
            activity.append((length, direction))
642
 
        from_file = StringIO(self.test_data)
643
 
        to_file = StringIO()
644
 
        osutils.pumpfile(from_file, to_file, buff_size=500,
645
 
                         report_activity=log_activity, direction='read')
646
 
        self.assertEqual([(500, 'read'), (500, 'read'), (500, 'read'),
647
 
                          (36, 'read')], activity)
648
 
 
649
 
        from_file = StringIO(self.test_data)
650
 
        to_file = StringIO()
651
 
        del activity[:]
652
 
        osutils.pumpfile(from_file, to_file, buff_size=500,
653
 
                         report_activity=log_activity, direction='write')
654
 
        self.assertEqual([(500, 'write'), (500, 'write'), (500, 'write'),
655
 
                          (36, 'write')], activity)
656
 
 
657
 
        # And with a limited amount of data
658
 
        from_file = StringIO(self.test_data)
659
 
        to_file = StringIO()
660
 
        del activity[:]
661
 
        osutils.pumpfile(from_file, to_file, buff_size=500, read_length=1028,
662
 
                         report_activity=log_activity, direction='read')
663
 
        self.assertEqual([(500, 'read'), (500, 'read'), (28, 'read')], activity)
664
 
 
665
 
 
666
 
 
667
 
class TestPumpStringFile(tests.TestCase):
 
489
 
 
490
class TestPumpStringFile(TestCase):
668
491
 
669
492
    def test_empty(self):
670
493
        output = StringIO()
671
 
        osutils.pump_string_file("", output)
 
494
        pump_string_file("", output)
672
495
        self.assertEqual("", output.getvalue())
673
496
 
674
497
    def test_more_than_segment_size(self):
675
498
        output = StringIO()
676
 
        osutils.pump_string_file("123456789", output, 2)
 
499
        pump_string_file("123456789", output, 2)
677
500
        self.assertEqual("123456789", output.getvalue())
678
501
 
679
502
    def test_segment_size(self):
680
503
        output = StringIO()
681
 
        osutils.pump_string_file("12", output, 2)
 
504
        pump_string_file("12", output, 2)
682
505
        self.assertEqual("12", output.getvalue())
683
506
 
684
507
    def test_segment_size_multiple(self):
685
508
        output = StringIO()
686
 
        osutils.pump_string_file("1234", output, 2)
 
509
        pump_string_file("1234", output, 2)
687
510
        self.assertEqual("1234", output.getvalue())
688
511
 
689
512
 
690
 
class TestRelpath(tests.TestCase):
691
 
 
692
 
    def test_simple_relpath(self):
693
 
        cwd = osutils.getcwd()
694
 
        subdir = cwd + '/subdir'
695
 
        self.assertEqual('subdir', osutils.relpath(cwd, subdir))
696
 
 
697
 
    def test_deep_relpath(self):
698
 
        cwd = osutils.getcwd()
699
 
        subdir = cwd + '/sub/subsubdir'
700
 
        self.assertEqual('sub/subsubdir', osutils.relpath(cwd, subdir))
701
 
 
702
 
    def test_not_relative(self):
703
 
        self.assertRaises(errors.PathNotChild,
704
 
                          osutils.relpath, 'C:/path', 'H:/path')
705
 
        self.assertRaises(errors.PathNotChild,
706
 
                          osutils.relpath, 'C:/', 'H:/path')
707
 
 
708
 
 
709
 
class TestSafeUnicode(tests.TestCase):
 
513
class TestSafeUnicode(TestCase):
710
514
 
711
515
    def test_from_ascii_string(self):
712
516
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
721
525
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
722
526
 
723
527
    def test_bad_utf8_string(self):
724
 
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
528
        self.assertRaises(BzrBadParameterNotUnicode,
725
529
                          osutils.safe_unicode,
726
530
                          '\xbb\xbb')
727
531
 
728
532
 
729
 
class TestSafeUtf8(tests.TestCase):
 
533
class TestSafeUtf8(TestCase):
730
534
 
731
535
    def test_from_ascii_string(self):
732
536
        f = 'foobar'
742
546
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
743
547
 
744
548
    def test_bad_utf8_string(self):
745
 
        self.assertRaises(errors.BzrBadParameterNotUnicode,
 
549
        self.assertRaises(BzrBadParameterNotUnicode,
746
550
                          osutils.safe_utf8, '\xbb\xbb')
747
551
 
748
552
 
749
 
class TestSafeRevisionId(tests.TestCase):
 
553
class TestSafeRevisionId(TestCase):
750
554
 
751
555
    def test_from_ascii_string(self):
752
556
        # this shouldn't give a warning because it's getting an ascii string
774
578
        self.assertEqual(None, osutils.safe_revision_id(None))
775
579
 
776
580
 
777
 
class TestSafeFileId(tests.TestCase):
 
581
class TestSafeFileId(TestCase):
778
582
 
779
583
    def test_from_ascii_string(self):
780
584
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
800
604
        self.assertEqual(None, osutils.safe_file_id(None))
801
605
 
802
606
 
803
 
class TestWin32Funcs(tests.TestCase):
804
 
    """Test that _win32 versions of os utilities return appropriate paths."""
 
607
class TestWin32Funcs(TestCase):
 
608
    """Test that the _win32 versions of os utilities return appropriate paths."""
805
609
 
806
610
    def test_abspath(self):
807
611
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
814
618
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
815
619
 
816
620
    def test_pathjoin(self):
817
 
        self.assertEqual('path/to/foo',
818
 
                         osutils._win32_pathjoin('path', 'to', 'foo'))
819
 
        self.assertEqual('C:/foo',
820
 
                         osutils._win32_pathjoin('path\\to', 'C:\\foo'))
821
 
        self.assertEqual('C:/foo',
822
 
                         osutils._win32_pathjoin('path/to', 'C:/foo'))
823
 
        self.assertEqual('path/to/foo',
824
 
                         osutils._win32_pathjoin('path/to/', 'foo'))
825
 
        self.assertEqual('/foo',
826
 
                         osutils._win32_pathjoin('C:/path/to/', '/foo'))
827
 
        self.assertEqual('/foo',
828
 
                         osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
621
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
622
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
623
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
624
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
625
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
626
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
829
627
 
830
628
    def test_normpath(self):
831
 
        self.assertEqual('path/to/foo',
832
 
                         osutils._win32_normpath(r'path\\from\..\to\.\foo'))
833
 
        self.assertEqual('path/to/foo',
834
 
                         osutils._win32_normpath('path//from/../to/./foo'))
 
629
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
630
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
835
631
 
836
632
    def test_getcwd(self):
837
633
        cwd = osutils._win32_getcwd()
866
662
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
867
663
 
868
664
 
869
 
class TestWin32FuncsDirs(tests.TestCaseInTempDir):
 
665
class TestWin32FuncsDirs(TestCaseInTempDir):
870
666
    """Test win32 functions that create files."""
 
667
    
 
668
    def test_getcwd(self):
 
669
        if win32utils.winver == 'Windows 98':
 
670
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
 
671
        # Make sure getcwd can handle unicode filenames
 
672
        try:
 
673
            os.mkdir(u'mu-\xb5')
 
674
        except UnicodeError:
 
675
            raise TestSkipped("Unable to create Unicode filename")
871
676
 
872
 
    def test_getcwd(self):
873
 
        self.requireFeature(tests.UnicodeFilenameFeature)
874
 
        os.mkdir(u'mu-\xb5')
875
677
        os.chdir(u'mu-\xb5')
876
678
        # TODO: jam 20060427 This will probably fail on Mac OSX because
877
679
        #       it will change the normalization of B\xe5gfors
882
684
    def test_minimum_path_selection(self):
883
685
        self.assertEqual(set(),
884
686
            osutils.minimum_path_selection([]))
885
 
        self.assertEqual(set(['a']),
886
 
            osutils.minimum_path_selection(['a']))
887
687
        self.assertEqual(set(['a', 'b']),
888
688
            osutils.minimum_path_selection(['a', 'b']))
889
689
        self.assertEqual(set(['a/', 'b']),
890
690
            osutils.minimum_path_selection(['a/', 'b']))
891
691
        self.assertEqual(set(['a/', 'b']),
892
692
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
893
 
        self.assertEqual(set(['a-b', 'a', 'a0b']),
894
 
            osutils.minimum_path_selection(['a-b', 'a/b', 'a0b', 'a']))
895
693
 
896
694
    def test_mkdtemp(self):
897
695
        tmpdir = osutils._win32_mkdtemp(dir='.')
953
751
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
954
752
 
955
753
 
956
 
class TestParentDirectories(tests.TestCaseInTempDir):
957
 
    """Test osutils.parent_directories()"""
958
 
 
959
 
    def test_parent_directories(self):
960
 
        self.assertEqual([], osutils.parent_directories('a'))
961
 
        self.assertEqual(['a'], osutils.parent_directories('a/b'))
962
 
        self.assertEqual(['a/b', 'a'], osutils.parent_directories('a/b/c'))
963
 
 
964
 
 
965
 
class TestMacFuncsDirs(tests.TestCaseInTempDir):
 
754
class TestMacFuncsDirs(TestCaseInTempDir):
966
755
    """Test mac special functions that require directories."""
967
756
 
968
757
    def test_getcwd(self):
969
 
        self.requireFeature(tests.UnicodeFilenameFeature)
970
 
        os.mkdir(u'B\xe5gfors')
 
758
        # On Mac, this will actually create Ba\u030agfors
 
759
        # but chdir will still work, because it accepts both paths
 
760
        try:
 
761
            os.mkdir(u'B\xe5gfors')
 
762
        except UnicodeError:
 
763
            raise TestSkipped("Unable to create Unicode filename")
 
764
 
971
765
        os.chdir(u'B\xe5gfors')
972
766
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
973
767
 
974
768
    def test_getcwd_nonnorm(self):
975
 
        self.requireFeature(tests.UnicodeFilenameFeature)
976
769
        # Test that _mac_getcwd() will normalize this path
977
 
        os.mkdir(u'Ba\u030agfors')
 
770
        try:
 
771
            os.mkdir(u'Ba\u030agfors')
 
772
        except UnicodeError:
 
773
            raise TestSkipped("Unable to create Unicode filename")
 
774
 
978
775
        os.chdir(u'Ba\u030agfors')
979
776
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
980
777
 
981
778
 
982
 
class TestChunksToLines(tests.TestCase):
 
779
class TestChunksToLines(TestCase):
983
780
 
984
781
    def test_smoketest(self):
985
782
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
987
784
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
988
785
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
989
786
 
990
 
    def test_osutils_binding(self):
991
 
        from bzrlib.tests import test__chunks_to_lines
992
 
        if test__chunks_to_lines.compiled_chunkstolines_feature.available():
 
787
    def test_is_compiled(self):
 
788
        from bzrlib.tests.test__chunks_to_lines import CompiledChunksToLinesFeature
 
789
        if CompiledChunksToLinesFeature:
993
790
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
994
791
        else:
995
792
            from bzrlib._chunks_to_lines_py import chunks_to_lines
996
793
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
997
794
 
998
795
 
999
 
class TestSplitLines(tests.TestCase):
 
796
class TestSplitLines(TestCase):
1000
797
 
1001
798
    def test_split_unicode(self):
1002
799
        self.assertEqual([u'foo\n', u'bar\xae'],
1009
806
                         osutils.split_lines('foo\rbar\n'))
1010
807
 
1011
808
 
1012
 
class TestWalkDirs(tests.TestCaseInTempDir):
1013
 
 
1014
 
    def assertExpectedBlocks(self, expected, result):
1015
 
        self.assertEqual(expected,
1016
 
                         [(dirinfo, [line[0:3] for line in block])
1017
 
                          for dirinfo, block in result])
 
809
class TestWalkDirs(TestCaseInTempDir):
1018
810
 
1019
811
    def test_walkdirs(self):
1020
812
        tree = [
1053
845
            result.append((dirdetail, dirblock))
1054
846
 
1055
847
        self.assertTrue(found_bzrdir)
1056
 
        self.assertExpectedBlocks(expected_dirblocks, result)
 
848
        self.assertEqual(expected_dirblocks,
 
849
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1057
850
        # you can search a subdir only, with a supplied prefix.
1058
851
        result = []
1059
852
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
1060
853
            result.append(dirblock)
1061
 
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
1062
 
 
1063
 
    def test_walkdirs_os_error(self):
1064
 
        # <https://bugs.edge.launchpad.net/bzr/+bug/338653>
1065
 
        # Pyrex readdir didn't raise useful messages if it had an error
1066
 
        # reading the directory
1067
 
        if sys.platform == 'win32':
1068
 
            raise tests.TestNotApplicable(
1069
 
                "readdir IOError not tested on win32")
1070
 
        os.mkdir("test-unreadable")
1071
 
        os.chmod("test-unreadable", 0000)
1072
 
        # must chmod it back so that it can be removed
1073
 
        self.addCleanup(os.chmod, "test-unreadable", 0700)
1074
 
        # The error is not raised until the generator is actually evaluated.
1075
 
        # (It would be ok if it happened earlier but at the moment it
1076
 
        # doesn't.)
1077
 
        e = self.assertRaises(OSError, list, osutils._walkdirs_utf8("."))
1078
 
        self.assertEquals('./test-unreadable', e.filename)
1079
 
        self.assertEquals(errno.EACCES, e.errno)
1080
 
        # Ensure the message contains the file name
1081
 
        self.assertContainsRe(str(e), "\./test-unreadable")
 
854
        self.assertEqual(expected_dirblocks[1:],
 
855
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1082
856
 
1083
857
    def test__walkdirs_utf8(self):
1084
858
        tree = [
1117
891
            result.append((dirdetail, dirblock))
1118
892
 
1119
893
        self.assertTrue(found_bzrdir)
1120
 
        self.assertExpectedBlocks(expected_dirblocks, result)
1121
 
 
 
894
        self.assertEqual(expected_dirblocks,
 
895
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1122
896
        # you can search a subdir only, with a supplied prefix.
1123
897
        result = []
1124
898
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
1125
899
            result.append(dirblock)
1126
 
        self.assertExpectedBlocks(expected_dirblocks[1:], result)
 
900
        self.assertEqual(expected_dirblocks[1:],
 
901
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
1127
902
 
1128
903
    def _filter_out_stat(self, result):
1129
904
        """Filter out the stat value from the walkdirs result"""
1144
919
            osutils._selected_dir_reader = cur_dir_reader
1145
920
        self.addCleanup(restore)
1146
921
 
1147
 
    def assertDirReaderIs(self, expected):
 
922
    def assertReadFSDirIs(self, expected):
1148
923
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
1149
924
        # Force it to redetect
1150
925
        osutils._selected_dir_reader = None
1157
932
        self._save_platform_info()
1158
933
        win32utils.winver = None # Avoid the win32 detection code
1159
934
        osutils._fs_enc = 'UTF-8'
1160
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
935
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
1161
936
 
1162
937
    def test_force_walkdirs_utf8_fs_ascii(self):
1163
938
        self.requireFeature(UTF8DirReaderFeature)
1164
939
        self._save_platform_info()
1165
940
        win32utils.winver = None # Avoid the win32 detection code
1166
941
        osutils._fs_enc = 'US-ASCII'
1167
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
942
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
1168
943
 
1169
944
    def test_force_walkdirs_utf8_fs_ANSI(self):
1170
945
        self.requireFeature(UTF8DirReaderFeature)
1171
946
        self._save_platform_info()
1172
947
        win32utils.winver = None # Avoid the win32 detection code
1173
948
        osutils._fs_enc = 'ANSI_X3.4-1968'
1174
 
        self.assertDirReaderIs(UTF8DirReaderFeature.reader)
 
949
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
1175
950
 
1176
951
    def test_force_walkdirs_utf8_fs_latin1(self):
1177
952
        self._save_platform_info()
1178
953
        win32utils.winver = None # Avoid the win32 detection code
1179
954
        osutils._fs_enc = 'latin1'
1180
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
955
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
1181
956
 
1182
957
    def test_force_walkdirs_utf8_nt(self):
1183
958
        # Disabled because the thunk of the whole walkdirs api is disabled.
1184
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
959
        self.requireFeature(Win32ReadDirFeature)
1185
960
        self._save_platform_info()
1186
961
        win32utils.winver = 'Windows NT'
1187
962
        from bzrlib._walkdirs_win32 import Win32ReadDir
1188
 
        self.assertDirReaderIs(Win32ReadDir)
 
963
        self.assertReadFSDirIs(Win32ReadDir)
1189
964
 
1190
965
    def test_force_walkdirs_utf8_98(self):
1191
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
966
        self.requireFeature(Win32ReadDirFeature)
1192
967
        self._save_platform_info()
1193
968
        win32utils.winver = 'Windows 98'
1194
 
        self.assertDirReaderIs(osutils.UnicodeDirReader)
 
969
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
1195
970
 
1196
971
    def test_unicode_walkdirs(self):
1197
972
        """Walkdirs should always return unicode paths."""
1198
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1199
973
        name0 = u'0file-\xb6'
1200
974
        name1 = u'1dir-\u062c\u0648'
1201
975
        name2 = u'2file-\u0633'
1206
980
            name1 + '/' + name1 + '/',
1207
981
            name2,
1208
982
            ]
1209
 
        self.build_tree(tree)
 
983
        try:
 
984
            self.build_tree(tree)
 
985
        except UnicodeError:
 
986
            raise TestSkipped('Could not represent Unicode chars'
 
987
                              ' in current encoding.')
1210
988
        expected_dirblocks = [
1211
989
                ((u'', u'.'),
1212
990
                 [(name0, name0, 'file', './' + name0),
1238
1016
 
1239
1017
        The abspath portion might be in unicode or utf-8
1240
1018
        """
1241
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1242
1019
        name0 = u'0file-\xb6'
1243
1020
        name1 = u'1dir-\u062c\u0648'
1244
1021
        name2 = u'2file-\u0633'
1249
1026
            name1 + '/' + name1 + '/',
1250
1027
            name2,
1251
1028
            ]
1252
 
        self.build_tree(tree)
 
1029
        try:
 
1030
            self.build_tree(tree)
 
1031
        except UnicodeError:
 
1032
            raise TestSkipped('Could not represent Unicode chars'
 
1033
                              ' in current encoding.')
1253
1034
        name0 = name0.encode('utf8')
1254
1035
        name1 = name1.encode('utf8')
1255
1036
        name2 = name2.encode('utf8')
1299
1080
 
1300
1081
        The abspath portion should be in unicode
1301
1082
        """
1302
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1303
1083
        # Use the unicode reader. TODO: split into driver-and-driven unit
1304
1084
        # tests.
1305
1085
        self._save_platform_info()
1314
1094
            name1u + '/' + name1u + '/',
1315
1095
            name2u,
1316
1096
            ]
1317
 
        self.build_tree(tree)
 
1097
        try:
 
1098
            self.build_tree(tree)
 
1099
        except UnicodeError:
 
1100
            raise TestSkipped('Could not represent Unicode chars'
 
1101
                              ' in current encoding.')
1318
1102
        name0 = name0u.encode('utf8')
1319
1103
        name1 = name1u.encode('utf8')
1320
1104
        name2 = name2u.encode('utf8')
1345
1129
        self.assertEqual(expected_dirblocks, result)
1346
1130
 
1347
1131
    def test__walkdirs_utf8_win32readdir(self):
1348
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1132
        self.requireFeature(Win32ReadDirFeature)
1349
1133
        self.requireFeature(tests.UnicodeFilenameFeature)
1350
1134
        from bzrlib._walkdirs_win32 import Win32ReadDir
1351
1135
        self._save_platform_info()
1402
1186
 
1403
1187
    def test__walkdirs_utf_win32_find_file_stat_file(self):
1404
1188
        """make sure our Stat values are valid"""
1405
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1189
        self.requireFeature(Win32ReadDirFeature)
1406
1190
        self.requireFeature(tests.UnicodeFilenameFeature)
1407
1191
        from bzrlib._walkdirs_win32 import Win32ReadDir
1408
1192
        name0u = u'0file-\xb6'
1426
1210
 
1427
1211
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
1428
1212
        """make sure our Stat values are valid"""
1429
 
        self.requireFeature(test__walkdirs_win32.win32_readdir_feature)
 
1213
        self.requireFeature(Win32ReadDirFeature)
1430
1214
        self.requireFeature(tests.UnicodeFilenameFeature)
1431
1215
        from bzrlib._walkdirs_win32 import Win32ReadDir
1432
1216
        name0u = u'0dir-\u062c\u0648'
1517
1301
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1518
1302
 
1519
1303
 
1520
 
class TestCopyTree(tests.TestCaseInTempDir):
1521
 
 
 
1304
class TestCopyTree(TestCaseInTempDir):
 
1305
    
1522
1306
    def test_copy_basic_tree(self):
1523
1307
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1524
1308
        osutils.copy_tree('source', 'target')
1533
1317
        self.assertEqual(['c'], os.listdir('target/b'))
1534
1318
 
1535
1319
    def test_copy_tree_symlinks(self):
1536
 
        self.requireFeature(tests.SymlinkFeature)
 
1320
        self.requireFeature(SymlinkFeature)
1537
1321
        self.build_tree(['source/'])
1538
1322
        os.symlink('a/generic/path', 'source/lnk')
1539
1323
        osutils.copy_tree('source', 'target')
1569
1353
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1570
1354
 
1571
1355
 
1572
 
class TestSetUnsetEnv(tests.TestCase):
 
1356
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
 
1357
# [bialix] 2006/12/26
 
1358
 
 
1359
 
 
1360
class TestSetUnsetEnv(TestCase):
1573
1361
    """Test updating the environment"""
1574
1362
 
1575
1363
    def setUp(self):
1599
1387
 
1600
1388
    def test_unicode(self):
1601
1389
        """Environment can only contain plain strings
1602
 
 
 
1390
        
1603
1391
        So Unicode strings must be encoded.
1604
1392
        """
1605
 
        uni_val, env_val = tests.probe_unicode_in_user_encoding()
 
1393
        uni_val, env_val = probe_unicode_in_user_encoding()
1606
1394
        if uni_val is None:
1607
 
            raise tests.TestSkipped(
1608
 
                'Cannot find a unicode character that works in encoding %s'
1609
 
                % (osutils.get_user_encoding(),))
 
1395
            raise TestSkipped('Cannot find a unicode character that works in'
 
1396
                              ' encoding %s' % (osutils.get_user_encoding(),))
1610
1397
 
1611
1398
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1612
1399
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1620
1407
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1621
1408
 
1622
1409
 
1623
 
class TestSizeShaFile(tests.TestCaseInTempDir):
1624
 
 
1625
 
    def test_sha_empty(self):
1626
 
        self.build_tree_contents([('foo', '')])
1627
 
        expected_sha = osutils.sha_string('')
1628
 
        f = open('foo')
1629
 
        self.addCleanup(f.close)
1630
 
        size, sha = osutils.size_sha_file(f)
1631
 
        self.assertEqual(0, size)
1632
 
        self.assertEqual(expected_sha, sha)
1633
 
 
1634
 
    def test_sha_mixed_endings(self):
1635
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1636
 
        self.build_tree_contents([('foo', text)])
1637
 
        expected_sha = osutils.sha_string(text)
1638
 
        f = open('foo', 'rb')
1639
 
        self.addCleanup(f.close)
1640
 
        size, sha = osutils.size_sha_file(f)
1641
 
        self.assertEqual(38, size)
1642
 
        self.assertEqual(expected_sha, sha)
1643
 
 
1644
 
 
1645
 
class TestShaFileByName(tests.TestCaseInTempDir):
1646
 
 
1647
 
    def test_sha_empty(self):
1648
 
        self.build_tree_contents([('foo', '')])
1649
 
        expected_sha = osutils.sha_string('')
1650
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1651
 
 
1652
 
    def test_sha_mixed_endings(self):
1653
 
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
1654
 
        self.build_tree_contents([('foo', text)])
1655
 
        expected_sha = osutils.sha_string(text)
1656
 
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1657
 
 
1658
 
 
1659
 
class TestResourceLoading(tests.TestCaseInTempDir):
 
1410
class TestLocalTimeOffset(TestCase):
 
1411
 
 
1412
    def test_local_time_offset(self):
 
1413
        """Test that local_time_offset() returns a sane value."""
 
1414
        offset = osutils.local_time_offset()
 
1415
        self.assertTrue(isinstance(offset, int))
 
1416
        # Test that the offset is no more than a eighteen hours in
 
1417
        # either direction.
 
1418
        # Time zone handling is system specific, so it is difficult to
 
1419
        # do more specific tests, but a value outside of this range is
 
1420
        # probably wrong.
 
1421
        eighteen_hours = 18 * 3600
 
1422
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1423
 
 
1424
    def test_local_time_offset_with_timestamp(self):
 
1425
        """Test that local_time_offset() works with a timestamp."""
 
1426
        offset = osutils.local_time_offset(1000000000.1234567)
 
1427
        self.assertTrue(isinstance(offset, int))
 
1428
        eighteen_hours = 18 * 3600
 
1429
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1430
 
 
1431
 
 
1432
class TestShaFileByName(TestCaseInTempDir):
 
1433
 
 
1434
    def test_sha_empty(self):
 
1435
        self.build_tree_contents([('foo', '')])
 
1436
        expected_sha = osutils.sha_string('')
 
1437
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1438
 
 
1439
    def test_sha_mixed_endings(self):
 
1440
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1441
        self.build_tree_contents([('foo', text)])
 
1442
        expected_sha = osutils.sha_string(text)
 
1443
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1444
 
 
1445
 
 
1446
_debug_text = \
 
1447
r'''# Copyright (C) 2005, 2006 Canonical Ltd
 
1448
#
 
1449
# This program is free software; you can redistribute it and/or modify
 
1450
# it under the terms of the GNU General Public License as published by
 
1451
# the Free Software Foundation; either version 2 of the License, or
 
1452
# (at your option) any later version.
 
1453
#
 
1454
# This program is distributed in the hope that it will be useful,
 
1455
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
1456
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
1457
# GNU General Public License for more details.
 
1458
#
 
1459
# You should have received a copy of the GNU General Public License
 
1460
# along with this program; if not, write to the Free Software
 
1461
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
1462
 
 
1463
 
 
1464
# NOTE: If update these, please also update the help for global-options in
 
1465
#       bzrlib/help_topics/__init__.py
 
1466
 
 
1467
debug_flags = set()
 
1468
"""Set of flags that enable different debug behaviour.
 
1469
 
 
1470
These are set with eg ``-Dlock`` on the bzr command line.
 
1471
 
 
1472
Options include:
 
1473
 
 
1474
 * auth - show authentication sections used
 
1475
 * error - show stack traces for all top level exceptions
 
1476
 * evil - capture call sites that do expensive or badly-scaling operations.
 
1477
 * fetch - trace history copying between repositories
 
1478
 * graph - trace graph traversal information
 
1479
 * hashcache - log every time a working file is read to determine its hash
 
1480
 * hooks - trace hook execution
 
1481
 * hpss - trace smart protocol requests and responses
 
1482
 * http - trace http connections, requests and responses
 
1483
 * index - trace major index operations
 
1484
 * knit - trace knit operations
 
1485
 * lock - trace when lockdir locks are taken or released
 
1486
 * merge - emit information for debugging merges
 
1487
 * pack - emit information about pack operations
 
1488
 
 
1489
"""
 
1490
'''
 
1491
 
 
1492
 
 
1493
class TestResourceLoading(TestCaseInTempDir):
1660
1494
 
1661
1495
    def test_resource_string(self):
1662
1496
        # test resource in bzrlib
1663
1497
        text = osutils.resource_string('bzrlib', 'debug.py')
1664
 
        self.assertContainsRe(text, "debug_flags = set()")
 
1498
        self.assertEquals(_debug_text, text)
1665
1499
        # test resource under bzrlib
1666
1500
        text = osutils.resource_string('bzrlib.ui', 'text.py')
1667
1501
        self.assertContainsRe(text, "class TextUIFactory")
1670
1504
            'yyy.xx')
1671
1505
        # test unknown resource
1672
1506
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')
1673
 
 
1674
 
 
1675
 
class TestReCompile(tests.TestCase):
1676
 
 
1677
 
    def test_re_compile_checked(self):
1678
 
        r = osutils.re_compile_checked(r'A*', re.IGNORECASE)
1679
 
        self.assertTrue(r.match('aaaa'))
1680
 
        self.assertTrue(r.match('aAaA'))
1681
 
 
1682
 
    def test_re_compile_checked_error(self):
1683
 
        # like https://bugs.launchpad.net/bzr/+bug/251352
1684
 
        err = self.assertRaises(
1685
 
            errors.BzrCommandError,
1686
 
            osutils.re_compile_checked, '*', re.IGNORECASE, 'test case')
1687
 
        self.assertEqual(
1688
 
            "Invalid regular expression in test case: '*': "
1689
 
            "nothing to repeat",
1690
 
            str(err))
1691
 
 
1692
 
 
1693
 
class TestDirReader(tests.TestCaseInTempDir):
1694
 
 
1695
 
    # Set by load_tests
1696
 
    _dir_reader_class = None
1697
 
    _native_to_unicode = None
1698
 
 
1699
 
    def setUp(self):
1700
 
        tests.TestCaseInTempDir.setUp(self)
1701
 
 
1702
 
        # Save platform specific info and reset it
1703
 
        cur_dir_reader = osutils._selected_dir_reader
1704
 
 
1705
 
        def restore():
1706
 
            osutils._selected_dir_reader = cur_dir_reader
1707
 
        self.addCleanup(restore)
1708
 
 
1709
 
        osutils._selected_dir_reader = self._dir_reader_class()
1710
 
 
1711
 
    def _get_ascii_tree(self):
1712
 
        tree = [
1713
 
            '0file',
1714
 
            '1dir/',
1715
 
            '1dir/0file',
1716
 
            '1dir/1dir/',
1717
 
            '2file'
1718
 
            ]
1719
 
        expected_dirblocks = [
1720
 
                (('', '.'),
1721
 
                 [('0file', '0file', 'file'),
1722
 
                  ('1dir', '1dir', 'directory'),
1723
 
                  ('2file', '2file', 'file'),
1724
 
                 ]
1725
 
                ),
1726
 
                (('1dir', './1dir'),
1727
 
                 [('1dir/0file', '0file', 'file'),
1728
 
                  ('1dir/1dir', '1dir', 'directory'),
1729
 
                 ]
1730
 
                ),
1731
 
                (('1dir/1dir', './1dir/1dir'),
1732
 
                 [
1733
 
                 ]
1734
 
                ),
1735
 
            ]
1736
 
        return tree, expected_dirblocks
1737
 
 
1738
 
    def test_walk_cur_dir(self):
1739
 
        tree, expected_dirblocks = self._get_ascii_tree()
1740
 
        self.build_tree(tree)
1741
 
        result = list(osutils._walkdirs_utf8('.'))
1742
 
        # Filter out stat and abspath
1743
 
        self.assertEqual(expected_dirblocks,
1744
 
                         [(dirinfo, [line[0:3] for line in block])
1745
 
                          for dirinfo, block in result])
1746
 
 
1747
 
    def test_walk_sub_dir(self):
1748
 
        tree, expected_dirblocks = self._get_ascii_tree()
1749
 
        self.build_tree(tree)
1750
 
        # you can search a subdir only, with a supplied prefix.
1751
 
        result = list(osutils._walkdirs_utf8('./1dir', '1dir'))
1752
 
        # Filter out stat and abspath
1753
 
        self.assertEqual(expected_dirblocks[1:],
1754
 
                         [(dirinfo, [line[0:3] for line in block])
1755
 
                          for dirinfo, block in result])
1756
 
 
1757
 
    def _get_unicode_tree(self):
1758
 
        name0u = u'0file-\xb6'
1759
 
        name1u = u'1dir-\u062c\u0648'
1760
 
        name2u = u'2file-\u0633'
1761
 
        tree = [
1762
 
            name0u,
1763
 
            name1u + '/',
1764
 
            name1u + '/' + name0u,
1765
 
            name1u + '/' + name1u + '/',
1766
 
            name2u,
1767
 
            ]
1768
 
        name0 = name0u.encode('UTF-8')
1769
 
        name1 = name1u.encode('UTF-8')
1770
 
        name2 = name2u.encode('UTF-8')
1771
 
        expected_dirblocks = [
1772
 
                (('', '.'),
1773
 
                 [(name0, name0, 'file', './' + name0u),
1774
 
                  (name1, name1, 'directory', './' + name1u),
1775
 
                  (name2, name2, 'file', './' + name2u),
1776
 
                 ]
1777
 
                ),
1778
 
                ((name1, './' + name1u),
1779
 
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
1780
 
                                                        + '/' + name0u),
1781
 
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
1782
 
                                                            + '/' + name1u),
1783
 
                 ]
1784
 
                ),
1785
 
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
1786
 
                 [
1787
 
                 ]
1788
 
                ),
1789
 
            ]
1790
 
        return tree, expected_dirblocks
1791
 
 
1792
 
    def _filter_out(self, raw_dirblocks):
1793
 
        """Filter out a walkdirs_utf8 result.
1794
 
 
1795
 
        stat field is removed, all native paths are converted to unicode
1796
 
        """
1797
 
        filtered_dirblocks = []
1798
 
        for dirinfo, block in raw_dirblocks:
1799
 
            dirinfo = (dirinfo[0], self._native_to_unicode(dirinfo[1]))
1800
 
            details = []
1801
 
            for line in block:
1802
 
                details.append(line[0:3] + (self._native_to_unicode(line[4]), ))
1803
 
            filtered_dirblocks.append((dirinfo, details))
1804
 
        return filtered_dirblocks
1805
 
 
1806
 
    def test_walk_unicode_tree(self):
1807
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1808
 
        tree, expected_dirblocks = self._get_unicode_tree()
1809
 
        self.build_tree(tree)
1810
 
        result = list(osutils._walkdirs_utf8('.'))
1811
 
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1812
 
 
1813
 
    def test_symlink(self):
1814
 
        self.requireFeature(tests.SymlinkFeature)
1815
 
        self.requireFeature(tests.UnicodeFilenameFeature)
1816
 
        target = u'target\N{Euro Sign}'
1817
 
        link_name = u'l\N{Euro Sign}nk'
1818
 
        os.symlink(target, link_name)
1819
 
        target_utf8 = target.encode('UTF-8')
1820
 
        link_name_utf8 = link_name.encode('UTF-8')
1821
 
        expected_dirblocks = [
1822
 
                (('', '.'),
1823
 
                 [(link_name_utf8, link_name_utf8,
1824
 
                   'symlink', './' + link_name),],
1825
 
                 )]
1826
 
        result = list(osutils._walkdirs_utf8('.'))
1827
 
        self.assertEqual(expected_dirblocks, self._filter_out(result))
1828
 
 
1829
 
 
1830
 
class TestReadLink(tests.TestCaseInTempDir):
1831
 
    """Exposes os.readlink() problems and the osutils solution.
1832
 
 
1833
 
    The only guarantee offered by os.readlink(), starting with 2.6, is that a
1834
 
    unicode string will be returned if a unicode string is passed.
1835
 
 
1836
 
    But prior python versions failed to properly encode the passed unicode
1837
 
    string.
1838
 
    """
1839
 
    _test_needs_features = [tests.SymlinkFeature, tests.UnicodeFilenameFeature]
1840
 
 
1841
 
    def setUp(self):
1842
 
        super(tests.TestCaseInTempDir, self).setUp()
1843
 
        self.link = u'l\N{Euro Sign}ink'
1844
 
        self.target = u'targe\N{Euro Sign}t'
1845
 
        os.symlink(self.target, self.link)
1846
 
 
1847
 
    def test_os_readlink_link_encoding(self):
1848
 
        if sys.version_info < (2, 6):
1849
 
            self.assertRaises(UnicodeEncodeError, os.readlink, self.link)
1850
 
        else:
1851
 
            self.assertEquals(self.target,  os.readlink(self.link))
1852
 
 
1853
 
    def test_os_readlink_link_decoding(self):
1854
 
        self.assertEquals(self.target.encode(osutils._fs_enc),
1855
 
                          os.readlink(self.link.encode(osutils._fs_enc)))
1856
 
 
1857
 
 
1858
 
class TestConcurrency(tests.TestCase):
1859
 
 
1860
 
    def setUp(self):
1861
 
        super(TestConcurrency, self).setUp()
1862
 
        orig = osutils._cached_local_concurrency
1863
 
        def restore():
1864
 
            osutils._cached_local_concurrency = orig
1865
 
        self.addCleanup(restore)
1866
 
 
1867
 
    def test_local_concurrency(self):
1868
 
        concurrency = osutils.local_concurrency()
1869
 
        self.assertIsInstance(concurrency, int)
1870
 
 
1871
 
    def test_local_concurrency_environment_variable(self):
1872
 
        os.environ['BZR_CONCURRENCY'] = '2'
1873
 
        self.assertEqual(2, osutils.local_concurrency(use_cache=False))
1874
 
        os.environ['BZR_CONCURRENCY'] = '3'
1875
 
        self.assertEqual(3, osutils.local_concurrency(use_cache=False))
1876
 
        os.environ['BZR_CONCURRENCY'] = 'foo'
1877
 
        self.assertEqual(1, osutils.local_concurrency(use_cache=False))
1878
 
 
1879
 
    def test_option_concurrency(self):
1880
 
        os.environ['BZR_CONCURRENCY'] = '1'
1881
 
        self.run_bzr('rocks --concurrency 42')
1882
 
        # Command line overrides envrionment variable
1883
 
        self.assertEquals('42', os.environ['BZR_CONCURRENCY'])
1884
 
        self.assertEquals(42, osutils.local_concurrency(use_cache=False))
1885
 
 
1886
 
 
1887
 
class TestFailedToLoadExtension(tests.TestCase):
1888
 
 
1889
 
    def _try_loading(self):
1890
 
        try:
1891
 
            import bzrlib._fictional_extension_py
1892
 
        except ImportError, e:
1893
 
            osutils.failed_to_load_extension(e)
1894
 
            return True
1895
 
 
1896
 
    def setUp(self):
1897
 
        super(TestFailedToLoadExtension, self).setUp()
1898
 
        self.saved_failures = osutils._extension_load_failures[:]
1899
 
        del osutils._extension_load_failures[:]
1900
 
        self.addCleanup(self.restore_failures)
1901
 
 
1902
 
    def restore_failures(self):
1903
 
        osutils._extension_load_failures = self.saved_failures
1904
 
 
1905
 
    def test_failure_to_load(self):
1906
 
        self._try_loading()
1907
 
        self.assertLength(1, osutils._extension_load_failures)
1908
 
        self.assertEquals(osutils._extension_load_failures[0],
1909
 
            "No module named _fictional_extension_py")
1910
 
 
1911
 
    def test_report_extension_load_failures_no_warning(self):
1912
 
        self.assertTrue(self._try_loading())
1913
 
        warnings, result = self.callCatchWarnings(osutils.report_extension_load_failures)
1914
 
        # it used to give a Python warning; it no longer does
1915
 
        self.assertLength(0, warnings)
1916
 
 
1917
 
    def test_report_extension_load_failures_message(self):
1918
 
        log = StringIO()
1919
 
        trace.push_log_file(log)
1920
 
        self.assertTrue(self._try_loading())
1921
 
        osutils.report_extension_load_failures()
1922
 
        self.assertContainsRe(
1923
 
            log.getvalue(),
1924
 
            r"bzr: warning: some compiled extensions could not be loaded; "
1925
 
            "see <https://answers\.launchpad\.net/bzr/\+faq/703>\n"
1926
 
            )
1927
 
 
1928
 
 
1929
 
class TestTerminalWidth(tests.TestCase):
1930
 
 
1931
 
    def replace_stdout(self, new):
1932
 
        orig_stdout = sys.stdout
1933
 
        def restore():
1934
 
            sys.stdout = orig_stdout
1935
 
        self.addCleanup(restore)
1936
 
        sys.stdout = new
1937
 
 
1938
 
    def replace__terminal_size(self, new):
1939
 
        orig__terminal_size = osutils._terminal_size
1940
 
        def restore():
1941
 
            osutils._terminal_size = orig__terminal_size
1942
 
        self.addCleanup(restore)
1943
 
        osutils._terminal_size = new
1944
 
 
1945
 
    def set_fake_tty(self):
1946
 
 
1947
 
        class I_am_a_tty(object):
1948
 
            def isatty(self):
1949
 
                return True
1950
 
 
1951
 
        self.replace_stdout(I_am_a_tty())
1952
 
 
1953
 
    def test_default_values(self):
1954
 
        self.assertEqual(80, osutils.default_terminal_width)
1955
 
 
1956
 
    def test_defaults_to_BZR_COLUMNS(self):
1957
 
        # BZR_COLUMNS is set by the test framework
1958
 
        self.assertNotEqual('12', os.environ['BZR_COLUMNS'])
1959
 
        os.environ['BZR_COLUMNS'] = '12'
1960
 
        self.assertEqual(12, osutils.terminal_width())
1961
 
 
1962
 
    def test_falls_back_to_COLUMNS(self):
1963
 
        del os.environ['BZR_COLUMNS']
1964
 
        self.assertNotEqual('42', os.environ['COLUMNS'])
1965
 
        self.set_fake_tty()
1966
 
        os.environ['COLUMNS'] = '42'
1967
 
        self.assertEqual(42, osutils.terminal_width())
1968
 
 
1969
 
    def test_tty_default_without_columns(self):
1970
 
        del os.environ['BZR_COLUMNS']
1971
 
        del os.environ['COLUMNS']
1972
 
 
1973
 
        def terminal_size(w, h):
1974
 
            return 42, 42
1975
 
 
1976
 
        self.set_fake_tty()
1977
 
        # We need to override the osutils definition as it depends on the
1978
 
        # running environment that we can't control (PQM running without a
1979
 
        # controlling terminal is one example).
1980
 
        self.replace__terminal_size(terminal_size)
1981
 
        self.assertEqual(42, osutils.terminal_width())
1982
 
 
1983
 
    def test_non_tty_default_without_columns(self):
1984
 
        del os.environ['BZR_COLUMNS']
1985
 
        del os.environ['COLUMNS']
1986
 
        self.replace_stdout(None)
1987
 
        self.assertEqual(None, osutils.terminal_width())
1988
 
 
1989
 
    def test_no_TIOCGWINSZ(self):
1990
 
        self.requireFeature(term_ios_feature)
1991
 
        termios = term_ios_feature.module
1992
 
        # bug 63539 is about a termios without TIOCGWINSZ attribute
1993
 
        try:
1994
 
            orig = termios.TIOCGWINSZ
1995
 
        except AttributeError:
1996
 
            # We won't remove TIOCGWINSZ, because it doesn't exist anyway :)
1997
 
            pass
1998
 
        else:
1999
 
            def restore():
2000
 
                termios.TIOCGWINSZ = orig
2001
 
            self.addCleanup(restore)
2002
 
            del termios.TIOCGWINSZ
2003
 
        del os.environ['BZR_COLUMNS']
2004
 
        del os.environ['COLUMNS']
2005
 
        # Whatever the result is, if we don't raise an exception, it's ok.
2006
 
        osutils.terminal_width()