~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Ian Clatworthy
  • Date: 2009-01-19 02:24:15 UTC
  • mto: This revision was merged to the branch mainline in revision 3944.
  • Revision ID: ian.clatworthy@canonical.com-20090119022415-mo0mcfeiexfktgwt
apply jam's log --short fix (Ian Clatworthy)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the osutils wrapper."""
18
18
 
 
19
from cStringIO import StringIO
19
20
import errno
20
21
import os
21
22
import socket
22
23
import stat
23
24
import sys
 
25
import time
24
26
 
25
 
import bzrlib
 
27
from bzrlib import (
 
28
    errors,
 
29
    osutils,
 
30
    tests,
 
31
    win32utils,
 
32
    )
26
33
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
27
 
import bzrlib.osutils as osutils
28
 
from bzrlib.tests import TestCaseInTempDir, TestCase, TestSkipped
 
34
from bzrlib.osutils import (
 
35
        is_inside_any,
 
36
        is_inside_or_parent_of_any,
 
37
        pathjoin,
 
38
        pumpfile,
 
39
        pump_string_file,
 
40
        canonical_relpath,
 
41
        )
 
42
from bzrlib.tests import (
 
43
        adapt_tests,
 
44
        Feature,
 
45
        probe_unicode_in_user_encoding,
 
46
        split_suite_by_re,
 
47
        StringIOWrapper,
 
48
        SymlinkFeature,
 
49
        CaseInsCasePresFilenameFeature,
 
50
        TestCase,
 
51
        TestCaseInTempDir,
 
52
        TestScenarioApplier,
 
53
        TestSkipped,
 
54
        )
 
55
from bzrlib.tests.file_utils import (
 
56
    FakeReadFile,
 
57
    )
 
58
from bzrlib.tests.test__walkdirs_win32 import Win32ReadDirFeature
 
59
 
 
60
 
 
61
class _UTF8DirReaderFeature(Feature):
 
62
 
 
63
    def _probe(self):
 
64
        try:
 
65
            from bzrlib import _readdir_pyx
 
66
            self.reader = _readdir_pyx.UTF8DirReader
 
67
            return True
 
68
        except ImportError:
 
69
            return False
 
70
 
 
71
    def feature_name(self):
 
72
        return 'bzrlib._readdir_pyx'
 
73
 
 
74
UTF8DirReaderFeature = _UTF8DirReaderFeature()
29
75
 
30
76
 
31
77
class TestOSUtils(TestCaseInTempDir):
32
78
 
 
79
    def test_contains_whitespace(self):
 
80
        self.failUnless(osutils.contains_whitespace(u' '))
 
81
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
82
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
83
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
84
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
85
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
86
 
 
87
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
88
        # is whitespace, but we do not.
 
89
        self.failIf(osutils.contains_whitespace(u''))
 
90
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
91
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
92
 
33
93
    def test_fancy_rename(self):
34
94
        # This should work everywhere
35
95
        def rename(a, b):
63
123
 
64
124
    # TODO: test fancy_rename using a MemoryTransport
65
125
 
 
126
    def test_rename_change_case(self):
 
127
        # on Windows we should be able to change filename case by rename
 
128
        self.build_tree(['a', 'b/'])
 
129
        osutils.rename('a', 'A')
 
130
        osutils.rename('b', 'B')
 
131
        # we can't use failUnlessExists on case-insensitive filesystem
 
132
        # so try to check shape of the tree
 
133
        shape = sorted(os.listdir('.'))
 
134
        self.assertEquals(['A', 'B'], shape)
 
135
 
66
136
    def test_01_rand_chars_empty(self):
67
137
        result = osutils.rand_chars(0)
68
138
        self.assertEqual(result, '')
73
143
        self.assertEqual(type(result), str)
74
144
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
75
145
 
 
146
    def test_is_inside(self):
 
147
        is_inside = osutils.is_inside
 
148
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
149
        self.assertFalse(is_inside('src', 'srccontrol'))
 
150
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
151
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
152
        self.assertFalse(is_inside('foo.c', ''))
 
153
        self.assertTrue(is_inside('', 'foo.c'))
 
154
 
 
155
    def test_is_inside_any(self):
 
156
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
157
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
158
                         (['src'], SRC_FOO_C),
 
159
                         (['src'], 'src'),
 
160
                         ]:
 
161
            self.assert_(is_inside_any(dirs, fn))
 
162
        for dirs, fn in [(['src'], 'srccontrol'),
 
163
                         (['src'], 'srccontrol/foo')]:
 
164
            self.assertFalse(is_inside_any(dirs, fn))
 
165
 
 
166
    def test_is_inside_or_parent_of_any(self):
 
167
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
168
                         (['src'], 'src/foo.c'),
 
169
                         (['src/bar.c'], 'src'),
 
170
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
171
                         (['src'], 'src'),
 
172
                         ]:
 
173
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
174
            
 
175
        for dirs, fn in [(['src'], 'srccontrol'),
 
176
                         (['srccontrol/foo.c'], 'src'),
 
177
                         (['src'], 'srccontrol/foo')]:
 
178
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
76
179
 
77
180
    def test_rmtree(self):
78
181
        # Check to remove tree with read-only files/dirs
126
229
            finally:
127
230
                os.remove('socket')
128
231
 
 
232
    def test_kind_marker(self):
 
233
        self.assertEqual(osutils.kind_marker('file'), '')
 
234
        self.assertEqual(osutils.kind_marker('directory'), '/')
 
235
        self.assertEqual(osutils.kind_marker('symlink'), '@')
 
236
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
 
237
 
 
238
    def test_get_umask(self):
 
239
        if sys.platform == 'win32':
 
240
            # umask always returns '0', no way to set it
 
241
            self.assertEqual(0, osutils.get_umask())
 
242
            return
 
243
 
 
244
        orig_umask = osutils.get_umask()
 
245
        try:
 
246
            os.umask(0222)
 
247
            self.assertEqual(0222, osutils.get_umask())
 
248
            os.umask(0022)
 
249
            self.assertEqual(0022, osutils.get_umask())
 
250
            os.umask(0002)
 
251
            self.assertEqual(0002, osutils.get_umask())
 
252
            os.umask(0027)
 
253
            self.assertEqual(0027, osutils.get_umask())
 
254
        finally:
 
255
            os.umask(orig_umask)
 
256
 
 
257
    def assertFormatedDelta(self, expected, seconds):
 
258
        """Assert osutils.format_delta formats as expected"""
 
259
        actual = osutils.format_delta(seconds)
 
260
        self.assertEqual(expected, actual)
 
261
 
 
262
    def test_format_delta(self):
 
263
        self.assertFormatedDelta('0 seconds ago', 0)
 
264
        self.assertFormatedDelta('1 second ago', 1)
 
265
        self.assertFormatedDelta('10 seconds ago', 10)
 
266
        self.assertFormatedDelta('59 seconds ago', 59)
 
267
        self.assertFormatedDelta('89 seconds ago', 89)
 
268
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
269
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
270
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
271
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
272
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
273
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
274
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
275
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
276
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
277
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
278
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
279
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
280
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
281
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
282
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
283
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
284
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
285
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
286
 
 
287
        # We handle when time steps the wrong direction because computers
 
288
        # don't have synchronized clocks.
 
289
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
290
        self.assertFormatedDelta('1 second in the future', -1)
 
291
        self.assertFormatedDelta('2 seconds in the future', -2)
 
292
 
 
293
    def test_format_date(self):
 
294
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
295
            osutils.format_date, 0, timezone='foo')
 
296
        self.assertIsInstance(osutils.format_date(0), str)
 
297
        self.assertIsInstance(osutils.format_local_date(0), unicode)
 
298
        # Testing for the actual value of the local weekday without
 
299
        # duplicating the code from format_date is difficult.
 
300
        # Instead blackbox.test_locale should check for localized
 
301
        # dates once they do occur in output strings.
 
302
 
 
303
    def test_dereference_path(self):
 
304
        self.requireFeature(SymlinkFeature)
 
305
        cwd = osutils.realpath('.')
 
306
        os.mkdir('bar')
 
307
        bar_path = osutils.pathjoin(cwd, 'bar')
 
308
        # Using './' to avoid bug #1213894 (first path component not
 
309
        # dereferenced) in Python 2.4.1 and earlier
 
310
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
311
        os.symlink('bar', 'foo')
 
312
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
313
        
 
314
        # Does not dereference terminal symlinks
 
315
        foo_path = osutils.pathjoin(cwd, 'foo')
 
316
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
317
 
 
318
        # Dereferences parent symlinks
 
319
        os.mkdir('bar/baz')
 
320
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
321
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
322
 
 
323
        # Dereferences parent symlinks that are the first path element
 
324
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
325
 
 
326
        # Dereferences parent symlinks in absolute paths
 
327
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
328
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
329
 
 
330
    def test_changing_access(self):
 
331
        f = file('file', 'w')
 
332
        f.write('monkey')
 
333
        f.close()
 
334
 
 
335
        # Make a file readonly
 
336
        osutils.make_readonly('file')
 
337
        mode = os.lstat('file').st_mode
 
338
        self.assertEqual(mode, mode & 0777555)
 
339
 
 
340
        # Make a file writable
 
341
        osutils.make_writable('file')
 
342
        mode = os.lstat('file').st_mode
 
343
        self.assertEqual(mode, mode | 0200)
 
344
 
 
345
        if osutils.has_symlinks():
 
346
            # should not error when handed a symlink
 
347
            os.symlink('nonexistent', 'dangling')
 
348
            osutils.make_readonly('dangling')
 
349
            osutils.make_writable('dangling')
 
350
 
 
351
    def test_kind_marker(self):
 
352
        self.assertEqual("", osutils.kind_marker("file"))
 
353
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
354
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
355
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
356
 
 
357
    def test_host_os_dereferences_symlinks(self):
 
358
        osutils.host_os_dereferences_symlinks()
 
359
 
 
360
 
 
361
class TestCanonicalRelPath(TestCaseInTempDir):
 
362
 
 
363
    _test_needs_features = [CaseInsCasePresFilenameFeature]
 
364
 
 
365
    def test_canonical_relpath_simple(self):
 
366
        f = file('MixedCaseName', 'w')
 
367
        f.close()
 
368
        self.failUnlessEqual(
 
369
            canonical_relpath(self.test_base_dir, 'mixedcasename'),
 
370
            'work/MixedCaseName')
 
371
 
 
372
    def test_canonical_relpath_missing_tail(self):
 
373
        os.mkdir('MixedCaseParent')
 
374
        self.failUnlessEqual(
 
375
            canonical_relpath(self.test_base_dir, 'mixedcaseparent/nochild'),
 
376
            'work/MixedCaseParent/nochild')
 
377
 
 
378
 
 
379
class TestPumpFile(TestCase):
 
380
    """Test pumpfile method."""
 
381
    def setUp(self):
 
382
        # create a test datablock
 
383
        self.block_size = 512
 
384
        pattern = '0123456789ABCDEF'
 
385
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
386
        self.test_data_len = len(self.test_data)
 
387
 
 
388
    def test_bracket_block_size(self):
 
389
        """Read data in blocks with the requested read size bracketing the
 
390
        block size."""
 
391
        # make sure test data is larger than max read size
 
392
        self.assertTrue(self.test_data_len > self.block_size)
 
393
 
 
394
        from_file = FakeReadFile(self.test_data)
 
395
        to_file = StringIO()
 
396
 
 
397
        # read (max / 2) bytes and verify read size wasn't affected
 
398
        num_bytes_to_read = self.block_size / 2
 
399
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
400
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
401
        self.assertEqual(from_file.get_read_count(), 1)
 
402
 
 
403
        # read (max) bytes and verify read size wasn't affected
 
404
        num_bytes_to_read = self.block_size
 
405
        from_file.reset_read_count()
 
406
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
407
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
408
        self.assertEqual(from_file.get_read_count(), 1)
 
409
 
 
410
        # read (max + 1) bytes and verify read size was limited
 
411
        num_bytes_to_read = self.block_size + 1
 
412
        from_file.reset_read_count()
 
413
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
414
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
415
        self.assertEqual(from_file.get_read_count(), 2)
 
416
 
 
417
        # finish reading the rest of the data
 
418
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
419
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
420
 
 
421
        # report error if the data wasn't equal (we only report the size due
 
422
        # to the length of the data)
 
423
        response_data = to_file.getvalue()
 
424
        if response_data != self.test_data:
 
425
            message = "Data not equal.  Expected %d bytes, received %d."
 
426
            self.fail(message % (len(response_data), self.test_data_len))
 
427
 
 
428
    def test_specified_size(self):
 
429
        """Request a transfer larger than the maximum block size and verify
 
430
        that the maximum read doesn't exceed the block_size."""
 
431
        # make sure test data is larger than max read size
 
432
        self.assertTrue(self.test_data_len > self.block_size)
 
433
 
 
434
        # retrieve data in blocks
 
435
        from_file = FakeReadFile(self.test_data)
 
436
        to_file = StringIO()
 
437
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
 
438
 
 
439
        # verify read size was equal to the maximum read size
 
440
        self.assertTrue(from_file.get_max_read_size() > 0)
 
441
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
442
        self.assertEqual(from_file.get_read_count(), 3)
 
443
 
 
444
        # report error if the data wasn't equal (we only report the size due
 
445
        # to the length of the data)
 
446
        response_data = to_file.getvalue()
 
447
        if response_data != self.test_data:
 
448
            message = "Data not equal.  Expected %d bytes, received %d."
 
449
            self.fail(message % (len(response_data), self.test_data_len))
 
450
 
 
451
    def test_to_eof(self):
 
452
        """Read to end-of-file and verify that the reads are not larger than
 
453
        the maximum read size."""
 
454
        # make sure test data is larger than max read size
 
455
        self.assertTrue(self.test_data_len > self.block_size)
 
456
 
 
457
        # retrieve data to EOF
 
458
        from_file = FakeReadFile(self.test_data)
 
459
        to_file = StringIO()
 
460
        pumpfile(from_file, to_file, -1, self.block_size)
 
461
 
 
462
        # verify read size was equal to the maximum read size
 
463
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
464
        self.assertEqual(from_file.get_read_count(), 4)
 
465
 
 
466
        # report error if the data wasn't equal (we only report the size due
 
467
        # to the length of the data)
 
468
        response_data = to_file.getvalue()
 
469
        if response_data != self.test_data:
 
470
            message = "Data not equal.  Expected %d bytes, received %d."
 
471
            self.fail(message % (len(response_data), self.test_data_len))
 
472
 
 
473
    def test_defaults(self):
 
474
        """Verifies that the default arguments will read to EOF -- this
 
475
        test verifies that any existing usages of pumpfile will not be broken
 
476
        with this new version."""
 
477
        # retrieve data using default (old) pumpfile method
 
478
        from_file = FakeReadFile(self.test_data)
 
479
        to_file = StringIO()
 
480
        pumpfile(from_file, to_file)
 
481
 
 
482
        # report error if the data wasn't equal (we only report the size due
 
483
        # to the length of the data)
 
484
        response_data = to_file.getvalue()
 
485
        if response_data != self.test_data:
 
486
            message = "Data not equal.  Expected %d bytes, received %d."
 
487
            self.fail(message % (len(response_data), self.test_data_len))
 
488
 
 
489
 
 
490
class TestPumpStringFile(TestCase):
 
491
 
 
492
    def test_empty(self):
 
493
        output = StringIO()
 
494
        pump_string_file("", output)
 
495
        self.assertEqual("", output.getvalue())
 
496
 
 
497
    def test_more_than_segment_size(self):
 
498
        output = StringIO()
 
499
        pump_string_file("123456789", output, 2)
 
500
        self.assertEqual("123456789", output.getvalue())
 
501
 
 
502
    def test_segment_size(self):
 
503
        output = StringIO()
 
504
        pump_string_file("12", output, 2)
 
505
        self.assertEqual("12", output.getvalue())
 
506
 
 
507
    def test_segment_size_multiple(self):
 
508
        output = StringIO()
 
509
        pump_string_file("1234", output, 2)
 
510
        self.assertEqual("1234", output.getvalue())
 
511
 
129
512
 
130
513
class TestSafeUnicode(TestCase):
131
514
 
147
530
                          '\xbb\xbb')
148
531
 
149
532
 
 
533
class TestSafeUtf8(TestCase):
 
534
 
 
535
    def test_from_ascii_string(self):
 
536
        f = 'foobar'
 
537
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
538
 
 
539
    def test_from_unicode_string_ascii_contents(self):
 
540
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
541
 
 
542
    def test_from_unicode_string_unicode_contents(self):
 
543
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
544
 
 
545
    def test_from_utf8_string(self):
 
546
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
547
 
 
548
    def test_bad_utf8_string(self):
 
549
        self.assertRaises(BzrBadParameterNotUnicode,
 
550
                          osutils.safe_utf8, '\xbb\xbb')
 
551
 
 
552
 
 
553
class TestSafeRevisionId(TestCase):
 
554
 
 
555
    def test_from_ascii_string(self):
 
556
        # this shouldn't give a warning because it's getting an ascii string
 
557
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
558
 
 
559
    def test_from_unicode_string_ascii_contents(self):
 
560
        self.assertEqual('bargam',
 
561
                         osutils.safe_revision_id(u'bargam', warn=False))
 
562
 
 
563
    def test_from_unicode_deprecated(self):
 
564
        self.assertEqual('bargam',
 
565
            self.callDeprecated([osutils._revision_id_warning],
 
566
                                osutils.safe_revision_id, u'bargam'))
 
567
 
 
568
    def test_from_unicode_string_unicode_contents(self):
 
569
        self.assertEqual('bargam\xc2\xae',
 
570
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
571
 
 
572
    def test_from_utf8_string(self):
 
573
        self.assertEqual('foo\xc2\xae',
 
574
                         osutils.safe_revision_id('foo\xc2\xae'))
 
575
 
 
576
    def test_none(self):
 
577
        """Currently, None is a valid revision_id"""
 
578
        self.assertEqual(None, osutils.safe_revision_id(None))
 
579
 
 
580
 
 
581
class TestSafeFileId(TestCase):
 
582
 
 
583
    def test_from_ascii_string(self):
 
584
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
585
 
 
586
    def test_from_unicode_string_ascii_contents(self):
 
587
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
588
 
 
589
    def test_from_unicode_deprecated(self):
 
590
        self.assertEqual('bargam',
 
591
            self.callDeprecated([osutils._file_id_warning],
 
592
                                osutils.safe_file_id, u'bargam'))
 
593
 
 
594
    def test_from_unicode_string_unicode_contents(self):
 
595
        self.assertEqual('bargam\xc2\xae',
 
596
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
597
 
 
598
    def test_from_utf8_string(self):
 
599
        self.assertEqual('foo\xc2\xae',
 
600
                         osutils.safe_file_id('foo\xc2\xae'))
 
601
 
 
602
    def test_none(self):
 
603
        """Currently, None is a valid revision_id"""
 
604
        self.assertEqual(None, osutils.safe_file_id(None))
 
605
 
 
606
 
150
607
class TestWin32Funcs(TestCase):
151
608
    """Test that the _win32 versions of os utilities return appropriate paths."""
152
609
 
153
610
    def test_abspath(self):
154
611
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
155
612
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
613
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
614
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
156
615
 
157
616
    def test_realpath(self):
158
617
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
171
630
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
172
631
 
173
632
    def test_getcwd(self):
174
 
        self.assertEqual(os.getcwdu().replace('\\', '/'), osutils._win32_getcwd())
 
633
        cwd = osutils._win32_getcwd()
 
634
        os_cwd = os.getcwdu()
 
635
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
636
        # win32 is inconsistent whether it returns lower or upper case
 
637
        # and even if it was consistent the user might type the other
 
638
        # so we force it to uppercase
 
639
        # running python.exe under cmd.exe return capital C:\\
 
640
        # running win32 python inside a cygwin shell returns lowercase
 
641
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
642
 
 
643
    def test_fixdrive(self):
 
644
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
645
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
646
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
647
 
 
648
    def test_win98_abspath(self):
 
649
        # absolute path
 
650
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
651
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
652
        # UNC path
 
653
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
654
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
655
        # relative path
 
656
        cwd = osutils.getcwd().rstrip('/')
 
657
        drive = osutils._nt_splitdrive(cwd)[0]
 
658
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
659
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
660
        # unicode path
 
661
        u = u'\u1234'
 
662
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
175
663
 
176
664
 
177
665
class TestWin32FuncsDirs(TestCaseInTempDir):
178
666
    """Test win32 functions that create files."""
179
667
    
180
668
    def test_getcwd(self):
 
669
        if win32utils.winver == 'Windows 98':
 
670
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
181
671
        # Make sure getcwd can handle unicode filenames
182
672
        try:
183
 
            os.mkdir(u'B\xe5gfors')
 
673
            os.mkdir(u'mu-\xb5')
184
674
        except UnicodeError:
185
675
            raise TestSkipped("Unable to create Unicode filename")
186
676
 
187
 
        os.chdir(u'B\xe5gfors')
 
677
        os.chdir(u'mu-\xb5')
188
678
        # TODO: jam 20060427 This will probably fail on Mac OSX because
189
679
        #       it will change the normalization of B\xe5gfors
190
680
        #       Consider using a different unicode character, or make
191
681
        #       osutils.getcwd() renormalize the path.
192
 
        self.assertTrue(osutils._win32_getcwd().endswith(u'/B\xe5gfors'))
 
682
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
683
 
 
684
    def test_minimum_path_selection(self):
 
685
        self.assertEqual(set(),
 
686
            osutils.minimum_path_selection([]))
 
687
        self.assertEqual(set(['a', 'b']),
 
688
            osutils.minimum_path_selection(['a', 'b']))
 
689
        self.assertEqual(set(['a/', 'b']),
 
690
            osutils.minimum_path_selection(['a/', 'b']))
 
691
        self.assertEqual(set(['a/', 'b']),
 
692
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
193
693
 
194
694
    def test_mkdtemp(self):
195
695
        tmpdir = osutils._win32_mkdtemp(dir='.')
208
708
        self.failIfExists('b')
209
709
        self.assertFileEqual('baz\n', 'a')
210
710
 
 
711
    def test_rename_missing_file(self):
 
712
        a = open('a', 'wb')
 
713
        a.write('foo\n')
 
714
        a.close()
 
715
 
 
716
        try:
 
717
            osutils._win32_rename('b', 'a')
 
718
        except (IOError, OSError), e:
 
719
            self.assertEqual(errno.ENOENT, e.errno)
 
720
        self.assertFileEqual('foo\n', 'a')
 
721
 
 
722
    def test_rename_missing_dir(self):
 
723
        os.mkdir('a')
 
724
        try:
 
725
            osutils._win32_rename('b', 'a')
 
726
        except (IOError, OSError), e:
 
727
            self.assertEqual(errno.ENOENT, e.errno)
 
728
 
 
729
    def test_rename_current_dir(self):
 
730
        os.mkdir('a')
 
731
        os.chdir('a')
 
732
        # You can't rename the working directory
 
733
        # doing rename non-existant . usually
 
734
        # just raises ENOENT, since non-existant
 
735
        # doesn't exist.
 
736
        try:
 
737
            osutils._win32_rename('b', '.')
 
738
        except (IOError, OSError), e:
 
739
            self.assertEqual(errno.ENOENT, e.errno)
 
740
 
 
741
    def test_splitpath(self):
 
742
        def check(expected, path):
 
743
            self.assertEqual(expected, osutils.splitpath(path))
 
744
 
 
745
        check(['a'], 'a')
 
746
        check(['a', 'b'], 'a/b')
 
747
        check(['a', 'b'], 'a/./b')
 
748
        check(['a', '.b'], 'a/.b')
 
749
        check(['a', '.b'], 'a\\.b')
 
750
 
 
751
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
752
 
 
753
 
 
754
class TestMacFuncsDirs(TestCaseInTempDir):
 
755
    """Test mac special functions that require directories."""
 
756
 
 
757
    def test_getcwd(self):
 
758
        # On Mac, this will actually create Ba\u030agfors
 
759
        # but chdir will still work, because it accepts both paths
 
760
        try:
 
761
            os.mkdir(u'B\xe5gfors')
 
762
        except UnicodeError:
 
763
            raise TestSkipped("Unable to create Unicode filename")
 
764
 
 
765
        os.chdir(u'B\xe5gfors')
 
766
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
767
 
 
768
    def test_getcwd_nonnorm(self):
 
769
        # Test that _mac_getcwd() will normalize this path
 
770
        try:
 
771
            os.mkdir(u'Ba\u030agfors')
 
772
        except UnicodeError:
 
773
            raise TestSkipped("Unable to create Unicode filename")
 
774
 
 
775
        os.chdir(u'Ba\u030agfors')
 
776
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
777
 
 
778
 
 
779
class TestChunksToLines(TestCase):
 
780
 
 
781
    def test_smoketest(self):
 
782
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
783
                         osutils.chunks_to_lines(['foo\nbar', '\nbaz\n']))
 
784
        self.assertEqual(['foo\n', 'bar\n', 'baz\n'],
 
785
                         osutils.chunks_to_lines(['foo\n', 'bar\n', 'baz\n']))
 
786
 
 
787
    def test_osutils_binding(self):
 
788
        from bzrlib.tests import test__chunks_to_lines
 
789
        if test__chunks_to_lines.CompiledChunksToLinesFeature.available():
 
790
            from bzrlib._chunks_to_lines_pyx import chunks_to_lines
 
791
        else:
 
792
            from bzrlib._chunks_to_lines_py import chunks_to_lines
 
793
        self.assertIs(chunks_to_lines, osutils.chunks_to_lines)
 
794
 
211
795
 
212
796
class TestSplitLines(TestCase):
213
797
 
235
819
            ]
236
820
        self.build_tree(tree)
237
821
        expected_dirblocks = [
238
 
                [
239
 
                    ('0file', '0file', 'file'),
240
 
                    ('1dir', '1dir', 'directory'),
241
 
                    ('2file', '2file', 'file'),
242
 
                ],
243
 
                [
244
 
                    ('1dir/0file', '0file', 'file'),
245
 
                    ('1dir/1dir', '1dir', 'directory'),
246
 
                ],
247
 
                [
248
 
                ],
249
 
            ]
250
 
        result = []
251
 
        found_bzrdir = False
252
 
        for dirblock in osutils.walkdirs('.'):
253
 
            if len(dirblock) and dirblock[0][1] == '.bzr':
254
 
                # this tests the filtering of selected paths
255
 
                found_bzrdir = True
256
 
                del dirblock[0]
257
 
            result.append(dirblock)
258
 
 
259
 
        self.assertTrue(found_bzrdir)
260
 
        self.assertEqual(expected_dirblocks,
261
 
            [[line[0:3] for line in block] for block in result])
262
 
        # you can search a subdir only, with a supplied prefix.
263
 
        result = []
264
 
        for dirblock in osutils.walkdirs('1dir', '1dir'):
265
 
            result.append(dirblock)
266
 
        self.assertEqual(expected_dirblocks[1:],
267
 
            [[line[0:3] for line in block] for block in result])
268
 
 
 
822
                (('', '.'),
 
823
                 [('0file', '0file', 'file'),
 
824
                  ('1dir', '1dir', 'directory'),
 
825
                  ('2file', '2file', 'file'),
 
826
                 ]
 
827
                ),
 
828
                (('1dir', './1dir'),
 
829
                 [('1dir/0file', '0file', 'file'),
 
830
                  ('1dir/1dir', '1dir', 'directory'),
 
831
                 ]
 
832
                ),
 
833
                (('1dir/1dir', './1dir/1dir'),
 
834
                 [
 
835
                 ]
 
836
                ),
 
837
            ]
 
838
        result = []
 
839
        found_bzrdir = False
 
840
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
841
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
842
                # this tests the filtering of selected paths
 
843
                found_bzrdir = True
 
844
                del dirblock[0]
 
845
            result.append((dirdetail, dirblock))
 
846
 
 
847
        self.assertTrue(found_bzrdir)
 
848
        self.assertEqual(expected_dirblocks,
 
849
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
850
        # you can search a subdir only, with a supplied prefix.
 
851
        result = []
 
852
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
853
            result.append(dirblock)
 
854
        self.assertEqual(expected_dirblocks[1:],
 
855
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
856
 
 
857
    def test__walkdirs_utf8(self):
 
858
        tree = [
 
859
            '.bzr',
 
860
            '0file',
 
861
            '1dir/',
 
862
            '1dir/0file',
 
863
            '1dir/1dir/',
 
864
            '2file'
 
865
            ]
 
866
        self.build_tree(tree)
 
867
        expected_dirblocks = [
 
868
                (('', '.'),
 
869
                 [('0file', '0file', 'file'),
 
870
                  ('1dir', '1dir', 'directory'),
 
871
                  ('2file', '2file', 'file'),
 
872
                 ]
 
873
                ),
 
874
                (('1dir', './1dir'),
 
875
                 [('1dir/0file', '0file', 'file'),
 
876
                  ('1dir/1dir', '1dir', 'directory'),
 
877
                 ]
 
878
                ),
 
879
                (('1dir/1dir', './1dir/1dir'),
 
880
                 [
 
881
                 ]
 
882
                ),
 
883
            ]
 
884
        result = []
 
885
        found_bzrdir = False
 
886
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
887
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
888
                # this tests the filtering of selected paths
 
889
                found_bzrdir = True
 
890
                del dirblock[0]
 
891
            result.append((dirdetail, dirblock))
 
892
 
 
893
        self.assertTrue(found_bzrdir)
 
894
        self.assertEqual(expected_dirblocks,
 
895
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
896
        # you can search a subdir only, with a supplied prefix.
 
897
        result = []
 
898
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
899
            result.append(dirblock)
 
900
        self.assertEqual(expected_dirblocks[1:],
 
901
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
902
 
 
903
    def _filter_out_stat(self, result):
 
904
        """Filter out the stat value from the walkdirs result"""
 
905
        for dirdetail, dirblock in result:
 
906
            new_dirblock = []
 
907
            for info in dirblock:
 
908
                # Ignore info[3] which is the stat
 
909
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
910
            dirblock[:] = new_dirblock
 
911
 
 
912
    def _save_platform_info(self):
 
913
        cur_winver = win32utils.winver
 
914
        cur_fs_enc = osutils._fs_enc
 
915
        cur_dir_reader = osutils._selected_dir_reader
 
916
        def restore():
 
917
            win32utils.winver = cur_winver
 
918
            osutils._fs_enc = cur_fs_enc
 
919
            osutils._selected_dir_reader = cur_dir_reader
 
920
        self.addCleanup(restore)
 
921
 
 
922
    def assertReadFSDirIs(self, expected):
 
923
        """Assert the right implementation for _walkdirs_utf8 is chosen."""
 
924
        # Force it to redetect
 
925
        osutils._selected_dir_reader = None
 
926
        # Nothing to list, but should still trigger the selection logic
 
927
        self.assertEqual([(('', '.'), [])], list(osutils._walkdirs_utf8('.')))
 
928
        self.assertIsInstance(osutils._selected_dir_reader, expected)
 
929
 
 
930
    def test_force_walkdirs_utf8_fs_utf8(self):
 
931
        self.requireFeature(UTF8DirReaderFeature)
 
932
        self._save_platform_info()
 
933
        win32utils.winver = None # Avoid the win32 detection code
 
934
        osutils._fs_enc = 'UTF-8'
 
935
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
 
936
 
 
937
    def test_force_walkdirs_utf8_fs_ascii(self):
 
938
        self.requireFeature(UTF8DirReaderFeature)
 
939
        self._save_platform_info()
 
940
        win32utils.winver = None # Avoid the win32 detection code
 
941
        osutils._fs_enc = 'US-ASCII'
 
942
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
 
943
 
 
944
    def test_force_walkdirs_utf8_fs_ANSI(self):
 
945
        self.requireFeature(UTF8DirReaderFeature)
 
946
        self._save_platform_info()
 
947
        win32utils.winver = None # Avoid the win32 detection code
 
948
        osutils._fs_enc = 'ANSI_X3.4-1968'
 
949
        self.assertReadFSDirIs(UTF8DirReaderFeature.reader)
 
950
 
 
951
    def test_force_walkdirs_utf8_fs_latin1(self):
 
952
        self._save_platform_info()
 
953
        win32utils.winver = None # Avoid the win32 detection code
 
954
        osutils._fs_enc = 'latin1'
 
955
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
 
956
 
 
957
    def test_force_walkdirs_utf8_nt(self):
 
958
        # Disabled because the thunk of the whole walkdirs api is disabled.
 
959
        self.requireFeature(Win32ReadDirFeature)
 
960
        self._save_platform_info()
 
961
        win32utils.winver = 'Windows NT'
 
962
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
963
        self.assertReadFSDirIs(Win32ReadDir)
 
964
 
 
965
    def test_force_walkdirs_utf8_98(self):
 
966
        self.requireFeature(Win32ReadDirFeature)
 
967
        self._save_platform_info()
 
968
        win32utils.winver = 'Windows 98'
 
969
        self.assertReadFSDirIs(osutils.UnicodeDirReader)
 
970
 
 
971
    def test_unicode_walkdirs(self):
 
972
        """Walkdirs should always return unicode paths."""
 
973
        name0 = u'0file-\xb6'
 
974
        name1 = u'1dir-\u062c\u0648'
 
975
        name2 = u'2file-\u0633'
 
976
        tree = [
 
977
            name0,
 
978
            name1 + '/',
 
979
            name1 + '/' + name0,
 
980
            name1 + '/' + name1 + '/',
 
981
            name2,
 
982
            ]
 
983
        try:
 
984
            self.build_tree(tree)
 
985
        except UnicodeError:
 
986
            raise TestSkipped('Could not represent Unicode chars'
 
987
                              ' in current encoding.')
 
988
        expected_dirblocks = [
 
989
                ((u'', u'.'),
 
990
                 [(name0, name0, 'file', './' + name0),
 
991
                  (name1, name1, 'directory', './' + name1),
 
992
                  (name2, name2, 'file', './' + name2),
 
993
                 ]
 
994
                ),
 
995
                ((name1, './' + name1),
 
996
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
997
                                                        + '/' + name0),
 
998
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
999
                                                            + '/' + name1),
 
1000
                 ]
 
1001
                ),
 
1002
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1003
                 [
 
1004
                 ]
 
1005
                ),
 
1006
            ]
 
1007
        result = list(osutils.walkdirs('.'))
 
1008
        self._filter_out_stat(result)
 
1009
        self.assertEqual(expected_dirblocks, result)
 
1010
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
1011
        self._filter_out_stat(result)
 
1012
        self.assertEqual(expected_dirblocks[1:], result)
 
1013
 
 
1014
    def test_unicode__walkdirs_utf8(self):
 
1015
        """Walkdirs_utf8 should always return utf8 paths.
 
1016
 
 
1017
        The abspath portion might be in unicode or utf-8
 
1018
        """
 
1019
        name0 = u'0file-\xb6'
 
1020
        name1 = u'1dir-\u062c\u0648'
 
1021
        name2 = u'2file-\u0633'
 
1022
        tree = [
 
1023
            name0,
 
1024
            name1 + '/',
 
1025
            name1 + '/' + name0,
 
1026
            name1 + '/' + name1 + '/',
 
1027
            name2,
 
1028
            ]
 
1029
        try:
 
1030
            self.build_tree(tree)
 
1031
        except UnicodeError:
 
1032
            raise TestSkipped('Could not represent Unicode chars'
 
1033
                              ' in current encoding.')
 
1034
        name0 = name0.encode('utf8')
 
1035
        name1 = name1.encode('utf8')
 
1036
        name2 = name2.encode('utf8')
 
1037
 
 
1038
        expected_dirblocks = [
 
1039
                (('', '.'),
 
1040
                 [(name0, name0, 'file', './' + name0),
 
1041
                  (name1, name1, 'directory', './' + name1),
 
1042
                  (name2, name2, 'file', './' + name2),
 
1043
                 ]
 
1044
                ),
 
1045
                ((name1, './' + name1),
 
1046
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
1047
                                                        + '/' + name0),
 
1048
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
1049
                                                            + '/' + name1),
 
1050
                 ]
 
1051
                ),
 
1052
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
1053
                 [
 
1054
                 ]
 
1055
                ),
 
1056
            ]
 
1057
        result = []
 
1058
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
1059
        # all abspaths are Unicode, and encode them back into utf8.
 
1060
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
1061
            self.assertIsInstance(dirdetail[0], str)
 
1062
            if isinstance(dirdetail[1], unicode):
 
1063
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
1064
                dirblock = [list(info) for info in dirblock]
 
1065
                for info in dirblock:
 
1066
                    self.assertIsInstance(info[4], unicode)
 
1067
                    info[4] = info[4].encode('utf8')
 
1068
            new_dirblock = []
 
1069
            for info in dirblock:
 
1070
                self.assertIsInstance(info[0], str)
 
1071
                self.assertIsInstance(info[1], str)
 
1072
                self.assertIsInstance(info[4], str)
 
1073
                # Remove the stat information
 
1074
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
1075
            result.append((dirdetail, new_dirblock))
 
1076
        self.assertEqual(expected_dirblocks, result)
 
1077
 
 
1078
    def test__walkdirs_utf8_with_unicode_fs(self):
 
1079
        """UnicodeDirReader should be a safe fallback everywhere
 
1080
 
 
1081
        The abspath portion should be in unicode
 
1082
        """
 
1083
        # Use the unicode reader. TODO: split into driver-and-driven unit
 
1084
        # tests.
 
1085
        self._save_platform_info()
 
1086
        osutils._selected_dir_reader = osutils.UnicodeDirReader()
 
1087
        name0u = u'0file-\xb6'
 
1088
        name1u = u'1dir-\u062c\u0648'
 
1089
        name2u = u'2file-\u0633'
 
1090
        tree = [
 
1091
            name0u,
 
1092
            name1u + '/',
 
1093
            name1u + '/' + name0u,
 
1094
            name1u + '/' + name1u + '/',
 
1095
            name2u,
 
1096
            ]
 
1097
        try:
 
1098
            self.build_tree(tree)
 
1099
        except UnicodeError:
 
1100
            raise TestSkipped('Could not represent Unicode chars'
 
1101
                              ' in current encoding.')
 
1102
        name0 = name0u.encode('utf8')
 
1103
        name1 = name1u.encode('utf8')
 
1104
        name2 = name2u.encode('utf8')
 
1105
 
 
1106
        # All of the abspaths should be in unicode, all of the relative paths
 
1107
        # should be in utf8
 
1108
        expected_dirblocks = [
 
1109
                (('', '.'),
 
1110
                 [(name0, name0, 'file', './' + name0u),
 
1111
                  (name1, name1, 'directory', './' + name1u),
 
1112
                  (name2, name2, 'file', './' + name2u),
 
1113
                 ]
 
1114
                ),
 
1115
                ((name1, './' + name1u),
 
1116
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1117
                                                        + '/' + name0u),
 
1118
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1119
                                                            + '/' + name1u),
 
1120
                 ]
 
1121
                ),
 
1122
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1123
                 [
 
1124
                 ]
 
1125
                ),
 
1126
            ]
 
1127
        result = list(osutils._walkdirs_utf8('.'))
 
1128
        self._filter_out_stat(result)
 
1129
        self.assertEqual(expected_dirblocks, result)
 
1130
 
 
1131
    def test__walkdirs_utf8_win32readdir(self):
 
1132
        self.requireFeature(Win32ReadDirFeature)
 
1133
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1134
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1135
        self._save_platform_info()
 
1136
        osutils._selected_dir_reader = Win32ReadDir()
 
1137
        name0u = u'0file-\xb6'
 
1138
        name1u = u'1dir-\u062c\u0648'
 
1139
        name2u = u'2file-\u0633'
 
1140
        tree = [
 
1141
            name0u,
 
1142
            name1u + '/',
 
1143
            name1u + '/' + name0u,
 
1144
            name1u + '/' + name1u + '/',
 
1145
            name2u,
 
1146
            ]
 
1147
        self.build_tree(tree)
 
1148
        name0 = name0u.encode('utf8')
 
1149
        name1 = name1u.encode('utf8')
 
1150
        name2 = name2u.encode('utf8')
 
1151
 
 
1152
        # All of the abspaths should be in unicode, all of the relative paths
 
1153
        # should be in utf8
 
1154
        expected_dirblocks = [
 
1155
                (('', '.'),
 
1156
                 [(name0, name0, 'file', './' + name0u),
 
1157
                  (name1, name1, 'directory', './' + name1u),
 
1158
                  (name2, name2, 'file', './' + name2u),
 
1159
                 ]
 
1160
                ),
 
1161
                ((name1, './' + name1u),
 
1162
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
1163
                                                        + '/' + name0u),
 
1164
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
1165
                                                            + '/' + name1u),
 
1166
                 ]
 
1167
                ),
 
1168
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
1169
                 [
 
1170
                 ]
 
1171
                ),
 
1172
            ]
 
1173
        result = list(osutils._walkdirs_utf8(u'.'))
 
1174
        self._filter_out_stat(result)
 
1175
        self.assertEqual(expected_dirblocks, result)
 
1176
 
 
1177
    def assertStatIsCorrect(self, path, win32stat):
 
1178
        os_stat = os.stat(path)
 
1179
        self.assertEqual(os_stat.st_size, win32stat.st_size)
 
1180
        self.assertAlmostEqual(os_stat.st_mtime, win32stat.st_mtime, places=4)
 
1181
        self.assertAlmostEqual(os_stat.st_ctime, win32stat.st_ctime, places=4)
 
1182
        self.assertAlmostEqual(os_stat.st_atime, win32stat.st_atime, places=4)
 
1183
        self.assertEqual(os_stat.st_dev, win32stat.st_dev)
 
1184
        self.assertEqual(os_stat.st_ino, win32stat.st_ino)
 
1185
        self.assertEqual(os_stat.st_mode, win32stat.st_mode)
 
1186
 
 
1187
    def test__walkdirs_utf_win32_find_file_stat_file(self):
 
1188
        """make sure our Stat values are valid"""
 
1189
        self.requireFeature(Win32ReadDirFeature)
 
1190
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1191
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1192
        name0u = u'0file-\xb6'
 
1193
        name0 = name0u.encode('utf8')
 
1194
        self.build_tree([name0u])
 
1195
        # I hate to sleep() here, but I'm trying to make the ctime different
 
1196
        # from the mtime
 
1197
        time.sleep(2)
 
1198
        f = open(name0u, 'ab')
 
1199
        try:
 
1200
            f.write('just a small update')
 
1201
        finally:
 
1202
            f.close()
 
1203
 
 
1204
        result = Win32ReadDir().read_dir('', u'.')
 
1205
        entry = result[0]
 
1206
        self.assertEqual((name0, name0, 'file'), entry[:3])
 
1207
        self.assertEqual(u'./' + name0u, entry[4])
 
1208
        self.assertStatIsCorrect(entry[4], entry[3])
 
1209
        self.assertNotEqual(entry[3].st_mtime, entry[3].st_ctime)
 
1210
 
 
1211
    def test__walkdirs_utf_win32_find_file_stat_directory(self):
 
1212
        """make sure our Stat values are valid"""
 
1213
        self.requireFeature(Win32ReadDirFeature)
 
1214
        self.requireFeature(tests.UnicodeFilenameFeature)
 
1215
        from bzrlib._walkdirs_win32 import Win32ReadDir
 
1216
        name0u = u'0dir-\u062c\u0648'
 
1217
        name0 = name0u.encode('utf8')
 
1218
        self.build_tree([name0u + '/'])
 
1219
 
 
1220
        result = Win32ReadDir().read_dir('', u'.')
 
1221
        entry = result[0]
 
1222
        self.assertEqual((name0, name0, 'directory'), entry[:3])
 
1223
        self.assertEqual(u'./' + name0u, entry[4])
 
1224
        self.assertStatIsCorrect(entry[4], entry[3])
 
1225
 
 
1226
    def assertPathCompare(self, path_less, path_greater):
 
1227
        """check that path_less and path_greater compare correctly."""
 
1228
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1229
            path_less, path_less))
 
1230
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
1231
            path_greater, path_greater))
 
1232
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
1233
            path_less, path_greater))
 
1234
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
1235
            path_greater, path_less))
 
1236
 
 
1237
    def test_compare_paths_prefix_order(self):
 
1238
        # root before all else
 
1239
        self.assertPathCompare("/", "/a")
 
1240
        # alpha within a dir
 
1241
        self.assertPathCompare("/a", "/b")
 
1242
        self.assertPathCompare("/b", "/z")
 
1243
        # high dirs before lower.
 
1244
        self.assertPathCompare("/z", "/a/a")
 
1245
        # except if the deeper dir should be output first
 
1246
        self.assertPathCompare("/a/b/c", "/d/g")
 
1247
        # lexical betwen dirs of the same height
 
1248
        self.assertPathCompare("/a/z", "/z/z")
 
1249
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
1250
 
 
1251
        # this should also be consistent for no leading / paths
 
1252
        # root before all else
 
1253
        self.assertPathCompare("", "a")
 
1254
        # alpha within a dir
 
1255
        self.assertPathCompare("a", "b")
 
1256
        self.assertPathCompare("b", "z")
 
1257
        # high dirs before lower.
 
1258
        self.assertPathCompare("z", "a/a")
 
1259
        # except if the deeper dir should be output first
 
1260
        self.assertPathCompare("a/b/c", "d/g")
 
1261
        # lexical betwen dirs of the same height
 
1262
        self.assertPathCompare("a/z", "z/z")
 
1263
        self.assertPathCompare("a/c/z", "a/d/e")
 
1264
 
 
1265
    def test_path_prefix_sorting(self):
 
1266
        """Doing a sort on path prefix should match our sample data."""
 
1267
        original_paths = [
 
1268
            'a',
 
1269
            'a/b',
 
1270
            'a/b/c',
 
1271
            'b',
 
1272
            'b/c',
 
1273
            'd',
 
1274
            'd/e',
 
1275
            'd/e/f',
 
1276
            'd/f',
 
1277
            'd/g',
 
1278
            'g',
 
1279
            ]
 
1280
 
 
1281
        dir_sorted_paths = [
 
1282
            'a',
 
1283
            'b',
 
1284
            'd',
 
1285
            'g',
 
1286
            'a/b',
 
1287
            'a/b/c',
 
1288
            'b/c',
 
1289
            'd/e',
 
1290
            'd/f',
 
1291
            'd/g',
 
1292
            'd/e/f',
 
1293
            ]
 
1294
 
 
1295
        self.assertEqual(
 
1296
            dir_sorted_paths,
 
1297
            sorted(original_paths, key=osutils.path_prefix_key))
 
1298
        # using the comparison routine shoudl work too:
 
1299
        self.assertEqual(
 
1300
            dir_sorted_paths,
 
1301
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1302
 
 
1303
 
 
1304
class TestCopyTree(TestCaseInTempDir):
 
1305
    
 
1306
    def test_copy_basic_tree(self):
 
1307
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1308
        osutils.copy_tree('source', 'target')
 
1309
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1310
        self.assertEqual(['c'], os.listdir('target/b'))
 
1311
 
 
1312
    def test_copy_tree_target_exists(self):
 
1313
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1314
                         'target/'])
 
1315
        osutils.copy_tree('source', 'target')
 
1316
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1317
        self.assertEqual(['c'], os.listdir('target/b'))
 
1318
 
 
1319
    def test_copy_tree_symlinks(self):
 
1320
        self.requireFeature(SymlinkFeature)
 
1321
        self.build_tree(['source/'])
 
1322
        os.symlink('a/generic/path', 'source/lnk')
 
1323
        osutils.copy_tree('source', 'target')
 
1324
        self.assertEqual(['lnk'], os.listdir('target'))
 
1325
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1326
 
 
1327
    def test_copy_tree_handlers(self):
 
1328
        processed_files = []
 
1329
        processed_links = []
 
1330
        def file_handler(from_path, to_path):
 
1331
            processed_files.append(('f', from_path, to_path))
 
1332
        def dir_handler(from_path, to_path):
 
1333
            processed_files.append(('d', from_path, to_path))
 
1334
        def link_handler(from_path, to_path):
 
1335
            processed_links.append((from_path, to_path))
 
1336
        handlers = {'file':file_handler,
 
1337
                    'directory':dir_handler,
 
1338
                    'symlink':link_handler,
 
1339
                   }
 
1340
 
 
1341
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1342
        if osutils.has_symlinks():
 
1343
            os.symlink('a/generic/path', 'source/lnk')
 
1344
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1345
 
 
1346
        self.assertEqual([('d', 'source', 'target'),
 
1347
                          ('f', 'source/a', 'target/a'),
 
1348
                          ('d', 'source/b', 'target/b'),
 
1349
                          ('f', 'source/b/c', 'target/b/c'),
 
1350
                         ], processed_files)
 
1351
        self.failIfExists('target')
 
1352
        if osutils.has_symlinks():
 
1353
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1354
 
 
1355
 
 
1356
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
 
1357
# [bialix] 2006/12/26
 
1358
 
 
1359
 
 
1360
class TestSetUnsetEnv(TestCase):
 
1361
    """Test updating the environment"""
 
1362
 
 
1363
    def setUp(self):
 
1364
        super(TestSetUnsetEnv, self).setUp()
 
1365
 
 
1366
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1367
                         'Environment was not cleaned up properly.'
 
1368
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1369
        def cleanup():
 
1370
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1371
                del os.environ['BZR_TEST_ENV_VAR']
 
1372
 
 
1373
        self.addCleanup(cleanup)
 
1374
 
 
1375
    def test_set(self):
 
1376
        """Test that we can set an env variable"""
 
1377
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1378
        self.assertEqual(None, old)
 
1379
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1380
 
 
1381
    def test_double_set(self):
 
1382
        """Test that we get the old value out"""
 
1383
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1384
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1385
        self.assertEqual('foo', old)
 
1386
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1387
 
 
1388
    def test_unicode(self):
 
1389
        """Environment can only contain plain strings
 
1390
        
 
1391
        So Unicode strings must be encoded.
 
1392
        """
 
1393
        uni_val, env_val = probe_unicode_in_user_encoding()
 
1394
        if uni_val is None:
 
1395
            raise TestSkipped('Cannot find a unicode character that works in'
 
1396
                              ' encoding %s' % (osutils.get_user_encoding(),))
 
1397
 
 
1398
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1399
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1400
 
 
1401
    def test_unset(self):
 
1402
        """Test that passing None will remove the env var"""
 
1403
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1404
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1405
        self.assertEqual('foo', old)
 
1406
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1407
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1408
 
 
1409
 
 
1410
class TestLocalTimeOffset(TestCase):
 
1411
 
 
1412
    def test_local_time_offset(self):
 
1413
        """Test that local_time_offset() returns a sane value."""
 
1414
        offset = osutils.local_time_offset()
 
1415
        self.assertTrue(isinstance(offset, int))
 
1416
        # Test that the offset is no more than a eighteen hours in
 
1417
        # either direction.
 
1418
        # Time zone handling is system specific, so it is difficult to
 
1419
        # do more specific tests, but a value outside of this range is
 
1420
        # probably wrong.
 
1421
        eighteen_hours = 18 * 3600
 
1422
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1423
 
 
1424
    def test_local_time_offset_with_timestamp(self):
 
1425
        """Test that local_time_offset() works with a timestamp."""
 
1426
        offset = osutils.local_time_offset(1000000000.1234567)
 
1427
        self.assertTrue(isinstance(offset, int))
 
1428
        eighteen_hours = 18 * 3600
 
1429
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1430
 
 
1431
 
 
1432
class TestShaFileByName(TestCaseInTempDir):
 
1433
 
 
1434
    def test_sha_empty(self):
 
1435
        self.build_tree_contents([('foo', '')])
 
1436
        expected_sha = osutils.sha_string('')
 
1437
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1438
 
 
1439
    def test_sha_mixed_endings(self):
 
1440
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1441
        self.build_tree_contents([('foo', text)])
 
1442
        expected_sha = osutils.sha_string(text)
 
1443
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1444
 
 
1445
 
 
1446
_debug_text = \
 
1447
r'''# Copyright (C) 2005, 2006 Canonical Ltd
 
1448
#
 
1449
# This program is free software; you can redistribute it and/or modify
 
1450
# it under the terms of the GNU General Public License as published by
 
1451
# the Free Software Foundation; either version 2 of the License, or
 
1452
# (at your option) any later version.
 
1453
#
 
1454
# This program is distributed in the hope that it will be useful,
 
1455
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
1456
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
1457
# GNU General Public License for more details.
 
1458
#
 
1459
# You should have received a copy of the GNU General Public License
 
1460
# along with this program; if not, write to the Free Software
 
1461
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
1462
 
 
1463
 
 
1464
# NOTE: If update these, please also update the help for global-options in
 
1465
#       bzrlib/help_topics/__init__.py
 
1466
 
 
1467
debug_flags = set()
 
1468
"""Set of flags that enable different debug behaviour.
 
1469
 
 
1470
These are set with eg ``-Dlock`` on the bzr command line.
 
1471
 
 
1472
Options include:
 
1473
 
 
1474
 * auth - show authentication sections used
 
1475
 * error - show stack traces for all top level exceptions
 
1476
 * evil - capture call sites that do expensive or badly-scaling operations.
 
1477
 * fetch - trace history copying between repositories
 
1478
 * graph - trace graph traversal information
 
1479
 * hashcache - log every time a working file is read to determine its hash
 
1480
 * hooks - trace hook execution
 
1481
 * hpss - trace smart protocol requests and responses
 
1482
 * http - trace http connections, requests and responses
 
1483
 * index - trace major index operations
 
1484
 * knit - trace knit operations
 
1485
 * lock - trace when lockdir locks are taken or released
 
1486
 * merge - emit information for debugging merges
 
1487
 * pack - emit information about pack operations
 
1488
 
 
1489
"""
 
1490
'''
 
1491
 
 
1492
 
 
1493
class TestResourceLoading(TestCaseInTempDir):
 
1494
 
 
1495
    def test_resource_string(self):
 
1496
        # test resource in bzrlib
 
1497
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1498
        self.assertEquals(_debug_text, text)
 
1499
        # test resource under bzrlib
 
1500
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1501
        self.assertContainsRe(text, "class TextUIFactory")
 
1502
        # test unsupported package
 
1503
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1504
            'yyy.xx')
 
1505
        # test unknown resource
 
1506
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')