~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-05-29 15:57:16 UTC
  • mfrom: (3427.5.9 dep_warnings)
  • Revision ID: pqm@pqm.ubuntu.com-20080529155716-0w3kic8lioa63231
(jam) Enable Deprecation Warnings when running -Werror and when
        running selftest

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
 
"""Tests for the osutils wrapper.
18
 
"""
 
17
"""Tests for the osutils wrapper."""
19
18
 
 
19
import errno
20
20
import os
 
21
import socket
 
22
import stat
21
23
import sys
22
24
 
23
25
import bzrlib
24
 
from bzrlib.errors import BzrBadParameterNotUnicode
25
 
import bzrlib.osutils as osutils
26
 
from bzrlib.tests import TestCaseInTempDir, TestCase
27
 
 
 
26
from bzrlib import (
 
27
    errors,
 
28
    osutils,
 
29
    win32utils,
 
30
    )
 
31
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
 
32
from bzrlib.osutils import (
 
33
        is_inside_any,
 
34
        is_inside_or_parent_of_any,
 
35
        pathjoin,
 
36
        pumpfile,
 
37
        )
 
38
from bzrlib.tests import (
 
39
        probe_unicode_in_user_encoding,
 
40
        StringIOWrapper,
 
41
        SymlinkFeature,
 
42
        TestCase,
 
43
        TestCaseInTempDir,
 
44
        TestSkipped,
 
45
        )
 
46
from bzrlib.tests.file_utils import (
 
47
    FakeReadFile,
 
48
    )
 
49
from cStringIO import StringIO
28
50
 
29
51
class TestOSUtils(TestCaseInTempDir):
30
52
 
 
53
    def test_contains_whitespace(self):
 
54
        self.failUnless(osutils.contains_whitespace(u' '))
 
55
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
56
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
57
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
58
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
59
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
60
 
 
61
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
62
        # is whitespace, but we do not.
 
63
        self.failIf(osutils.contains_whitespace(u''))
 
64
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
65
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
66
 
31
67
    def test_fancy_rename(self):
32
68
        # This should work everywhere
33
69
        def rename(a, b):
61
97
 
62
98
    # TODO: test fancy_rename using a MemoryTransport
63
99
 
 
100
    def test_rename_change_case(self):
 
101
        # on Windows we should be able to change filename case by rename
 
102
        self.build_tree(['a', 'b/'])
 
103
        osutils.rename('a', 'A')
 
104
        osutils.rename('b', 'B')
 
105
        # we can't use failUnlessExists on case-insensitive filesystem
 
106
        # so try to check shape of the tree
 
107
        shape = sorted(os.listdir('.'))
 
108
        self.assertEquals(['A', 'B'], shape)
 
109
 
64
110
    def test_01_rand_chars_empty(self):
65
111
        result = osutils.rand_chars(0)
66
112
        self.assertEqual(result, '')
71
117
        self.assertEqual(type(result), str)
72
118
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
73
119
 
 
120
    def test_is_inside(self):
 
121
        is_inside = osutils.is_inside
 
122
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
123
        self.assertFalse(is_inside('src', 'srccontrol'))
 
124
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
125
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
126
        self.assertFalse(is_inside('foo.c', ''))
 
127
        self.assertTrue(is_inside('', 'foo.c'))
 
128
 
 
129
    def test_is_inside_any(self):
 
130
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
131
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
132
                         (['src'], SRC_FOO_C),
 
133
                         (['src'], 'src'),
 
134
                         ]:
 
135
            self.assert_(is_inside_any(dirs, fn))
 
136
        for dirs, fn in [(['src'], 'srccontrol'),
 
137
                         (['src'], 'srccontrol/foo')]:
 
138
            self.assertFalse(is_inside_any(dirs, fn))
 
139
 
 
140
    def test_is_inside_or_parent_of_any(self):
 
141
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
142
                         (['src'], 'src/foo.c'),
 
143
                         (['src/bar.c'], 'src'),
 
144
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
145
                         (['src'], 'src'),
 
146
                         ]:
 
147
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
148
            
 
149
        for dirs, fn in [(['src'], 'srccontrol'),
 
150
                         (['srccontrol/foo.c'], 'src'),
 
151
                         (['src'], 'srccontrol/foo')]:
 
152
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
 
153
 
 
154
    def test_rmtree(self):
 
155
        # Check to remove tree with read-only files/dirs
 
156
        os.mkdir('dir')
 
157
        f = file('dir/file', 'w')
 
158
        f.write('spam')
 
159
        f.close()
 
160
        # would like to also try making the directory readonly, but at the
 
161
        # moment python shutil.rmtree doesn't handle that properly - it would
 
162
        # need to chmod the directory before removing things inside it - deferred
 
163
        # for now -- mbp 20060505
 
164
        # osutils.make_readonly('dir')
 
165
        osutils.make_readonly('dir/file')
 
166
 
 
167
        osutils.rmtree('dir')
 
168
 
 
169
        self.failIfExists('dir/file')
 
170
        self.failIfExists('dir')
 
171
 
 
172
    def test_file_kind(self):
 
173
        self.build_tree(['file', 'dir/'])
 
174
        self.assertEquals('file', osutils.file_kind('file'))
 
175
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
176
        if osutils.has_symlinks():
 
177
            os.symlink('symlink', 'symlink')
 
178
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
179
        
 
180
        # TODO: jam 20060529 Test a block device
 
181
        try:
 
182
            os.lstat('/dev/null')
 
183
        except OSError, e:
 
184
            if e.errno not in (errno.ENOENT,):
 
185
                raise
 
186
        else:
 
187
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
188
 
 
189
        mkfifo = getattr(os, 'mkfifo', None)
 
190
        if mkfifo:
 
191
            mkfifo('fifo')
 
192
            try:
 
193
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
194
            finally:
 
195
                os.remove('fifo')
 
196
 
 
197
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
198
        if AF_UNIX:
 
199
            s = socket.socket(AF_UNIX)
 
200
            s.bind('socket')
 
201
            try:
 
202
                self.assertEquals('socket', osutils.file_kind('socket'))
 
203
            finally:
 
204
                os.remove('socket')
 
205
 
 
206
    def test_kind_marker(self):
 
207
        self.assertEqual(osutils.kind_marker('file'), '')
 
208
        self.assertEqual(osutils.kind_marker('directory'), '/')
 
209
        self.assertEqual(osutils.kind_marker('symlink'), '@')
 
210
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
 
211
 
 
212
    def test_get_umask(self):
 
213
        if sys.platform == 'win32':
 
214
            # umask always returns '0', no way to set it
 
215
            self.assertEqual(0, osutils.get_umask())
 
216
            return
 
217
 
 
218
        orig_umask = osutils.get_umask()
 
219
        try:
 
220
            os.umask(0222)
 
221
            self.assertEqual(0222, osutils.get_umask())
 
222
            os.umask(0022)
 
223
            self.assertEqual(0022, osutils.get_umask())
 
224
            os.umask(0002)
 
225
            self.assertEqual(0002, osutils.get_umask())
 
226
            os.umask(0027)
 
227
            self.assertEqual(0027, osutils.get_umask())
 
228
        finally:
 
229
            os.umask(orig_umask)
 
230
 
 
231
    def assertFormatedDelta(self, expected, seconds):
 
232
        """Assert osutils.format_delta formats as expected"""
 
233
        actual = osutils.format_delta(seconds)
 
234
        self.assertEqual(expected, actual)
 
235
 
 
236
    def test_format_delta(self):
 
237
        self.assertFormatedDelta('0 seconds ago', 0)
 
238
        self.assertFormatedDelta('1 second ago', 1)
 
239
        self.assertFormatedDelta('10 seconds ago', 10)
 
240
        self.assertFormatedDelta('59 seconds ago', 59)
 
241
        self.assertFormatedDelta('89 seconds ago', 89)
 
242
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
243
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
244
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
245
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
246
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
247
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
248
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
249
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
250
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
251
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
252
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
253
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
254
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
255
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
256
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
257
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
258
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
259
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
260
 
 
261
        # We handle when time steps the wrong direction because computers
 
262
        # don't have synchronized clocks.
 
263
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
264
        self.assertFormatedDelta('1 second in the future', -1)
 
265
        self.assertFormatedDelta('2 seconds in the future', -2)
 
266
 
 
267
    def test_format_date(self):
 
268
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
269
            osutils.format_date, 0, timezone='foo')
 
270
 
 
271
    def test_dereference_path(self):
 
272
        self.requireFeature(SymlinkFeature)
 
273
        cwd = osutils.realpath('.')
 
274
        os.mkdir('bar')
 
275
        bar_path = osutils.pathjoin(cwd, 'bar')
 
276
        # Using './' to avoid bug #1213894 (first path component not
 
277
        # dereferenced) in Python 2.4.1 and earlier
 
278
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
279
        os.symlink('bar', 'foo')
 
280
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
281
        
 
282
        # Does not dereference terminal symlinks
 
283
        foo_path = osutils.pathjoin(cwd, 'foo')
 
284
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
285
 
 
286
        # Dereferences parent symlinks
 
287
        os.mkdir('bar/baz')
 
288
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
289
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
290
 
 
291
        # Dereferences parent symlinks that are the first path element
 
292
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
293
 
 
294
        # Dereferences parent symlinks in absolute paths
 
295
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
296
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
297
 
 
298
    def test_changing_access(self):
 
299
        f = file('file', 'w')
 
300
        f.write('monkey')
 
301
        f.close()
 
302
 
 
303
        # Make a file readonly
 
304
        osutils.make_readonly('file')
 
305
        mode = os.lstat('file').st_mode
 
306
        self.assertEqual(mode, mode & 0777555)
 
307
 
 
308
        # Make a file writable
 
309
        osutils.make_writable('file')
 
310
        mode = os.lstat('file').st_mode
 
311
        self.assertEqual(mode, mode | 0200)
 
312
 
 
313
        if osutils.has_symlinks():
 
314
            # should not error when handed a symlink
 
315
            os.symlink('nonexistent', 'dangling')
 
316
            osutils.make_readonly('dangling')
 
317
            osutils.make_writable('dangling')
 
318
 
 
319
    def test_kind_marker(self):
 
320
        self.assertEqual("", osutils.kind_marker("file"))
 
321
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
322
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
323
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
324
 
 
325
 
 
326
class TestPumpFile(TestCase):
 
327
    """Test pumpfile method."""
 
328
    def setUp(self):
 
329
        # create a test datablock
 
330
        self.block_size = 512
 
331
        pattern = '0123456789ABCDEF'
 
332
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
333
        self.test_data_len = len(self.test_data)
 
334
 
 
335
    def test_bracket_block_size(self):
 
336
        """Read data in blocks with the requested read size bracketing the
 
337
        block size."""
 
338
        # make sure test data is larger than max read size
 
339
        self.assertTrue(self.test_data_len > self.block_size)
 
340
 
 
341
        from_file = FakeReadFile(self.test_data)
 
342
        to_file = StringIO()
 
343
 
 
344
        # read (max / 2) bytes and verify read size wasn't affected
 
345
        num_bytes_to_read = self.block_size / 2
 
346
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
347
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
348
        self.assertEqual(from_file.get_read_count(), 1)
 
349
 
 
350
        # read (max) bytes and verify read size wasn't affected
 
351
        num_bytes_to_read = self.block_size
 
352
        from_file.reset_read_count()
 
353
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
354
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
355
        self.assertEqual(from_file.get_read_count(), 1)
 
356
 
 
357
        # read (max + 1) bytes and verify read size was limited
 
358
        num_bytes_to_read = self.block_size + 1
 
359
        from_file.reset_read_count()
 
360
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
361
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
362
        self.assertEqual(from_file.get_read_count(), 2)
 
363
 
 
364
        # finish reading the rest of the data
 
365
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
366
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
367
 
 
368
        # report error if the data wasn't equal (we only report the size due
 
369
        # to the length of the data)
 
370
        response_data = to_file.getvalue()
 
371
        if response_data != self.test_data:
 
372
            message = "Data not equal.  Expected %d bytes, received %d."
 
373
            self.fail(message % (len(response_data), self.test_data_len))
 
374
 
 
375
    def test_specified_size(self):
 
376
        """Request a transfer larger than the maximum block size and verify
 
377
        that the maximum read doesn't exceed the block_size."""
 
378
        # make sure test data is larger than max read size
 
379
        self.assertTrue(self.test_data_len > self.block_size)
 
380
 
 
381
        # retrieve data in blocks
 
382
        from_file = FakeReadFile(self.test_data)
 
383
        to_file = StringIO()
 
384
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
 
385
 
 
386
        # verify read size was equal to the maximum read size
 
387
        self.assertTrue(from_file.get_max_read_size() > 0)
 
388
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
389
        self.assertEqual(from_file.get_read_count(), 3)
 
390
 
 
391
        # report error if the data wasn't equal (we only report the size due
 
392
        # to the length of the data)
 
393
        response_data = to_file.getvalue()
 
394
        if response_data != self.test_data:
 
395
            message = "Data not equal.  Expected %d bytes, received %d."
 
396
            self.fail(message % (len(response_data), self.test_data_len))
 
397
 
 
398
    def test_to_eof(self):
 
399
        """Read to end-of-file and verify that the reads are not larger than
 
400
        the maximum read size."""
 
401
        # make sure test data is larger than max read size
 
402
        self.assertTrue(self.test_data_len > self.block_size)
 
403
 
 
404
        # retrieve data to EOF
 
405
        from_file = FakeReadFile(self.test_data)
 
406
        to_file = StringIO()
 
407
        pumpfile(from_file, to_file, -1, self.block_size)
 
408
 
 
409
        # verify read size was equal to the maximum read size
 
410
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
411
        self.assertEqual(from_file.get_read_count(), 4)
 
412
 
 
413
        # report error if the data wasn't equal (we only report the size due
 
414
        # to the length of the data)
 
415
        response_data = to_file.getvalue()
 
416
        if response_data != self.test_data:
 
417
            message = "Data not equal.  Expected %d bytes, received %d."
 
418
            self.fail(message % (len(response_data), self.test_data_len))
 
419
 
 
420
    def test_defaults(self):
 
421
        """Verifies that the default arguments will read to EOF -- this
 
422
        test verifies that any existing usages of pumpfile will not be broken
 
423
        with this new version."""
 
424
        # retrieve data using default (old) pumpfile method
 
425
        from_file = FakeReadFile(self.test_data)
 
426
        to_file = StringIO()
 
427
        pumpfile(from_file, to_file)
 
428
 
 
429
        # report error if the data wasn't equal (we only report the size due
 
430
        # to the length of the data)
 
431
        response_data = to_file.getvalue()
 
432
        if response_data != self.test_data:
 
433
            message = "Data not equal.  Expected %d bytes, received %d."
 
434
            self.fail(message % (len(response_data), self.test_data_len))
74
435
 
75
436
class TestSafeUnicode(TestCase):
76
437
 
90
451
        self.assertRaises(BzrBadParameterNotUnicode,
91
452
                          osutils.safe_unicode,
92
453
                          '\xbb\xbb')
 
454
 
 
455
 
 
456
class TestSafeUtf8(TestCase):
 
457
 
 
458
    def test_from_ascii_string(self):
 
459
        f = 'foobar'
 
460
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
461
 
 
462
    def test_from_unicode_string_ascii_contents(self):
 
463
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
464
 
 
465
    def test_from_unicode_string_unicode_contents(self):
 
466
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
467
 
 
468
    def test_from_utf8_string(self):
 
469
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
470
 
 
471
    def test_bad_utf8_string(self):
 
472
        self.assertRaises(BzrBadParameterNotUnicode,
 
473
                          osutils.safe_utf8, '\xbb\xbb')
 
474
 
 
475
 
 
476
class TestSafeRevisionId(TestCase):
 
477
 
 
478
    def test_from_ascii_string(self):
 
479
        # this shouldn't give a warning because it's getting an ascii string
 
480
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
481
 
 
482
    def test_from_unicode_string_ascii_contents(self):
 
483
        self.assertEqual('bargam',
 
484
                         osutils.safe_revision_id(u'bargam', warn=False))
 
485
 
 
486
    def test_from_unicode_deprecated(self):
 
487
        self.assertEqual('bargam',
 
488
            self.callDeprecated([osutils._revision_id_warning],
 
489
                                osutils.safe_revision_id, u'bargam'))
 
490
 
 
491
    def test_from_unicode_string_unicode_contents(self):
 
492
        self.assertEqual('bargam\xc2\xae',
 
493
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
494
 
 
495
    def test_from_utf8_string(self):
 
496
        self.assertEqual('foo\xc2\xae',
 
497
                         osutils.safe_revision_id('foo\xc2\xae'))
 
498
 
 
499
    def test_none(self):
 
500
        """Currently, None is a valid revision_id"""
 
501
        self.assertEqual(None, osutils.safe_revision_id(None))
 
502
 
 
503
 
 
504
class TestSafeFileId(TestCase):
 
505
 
 
506
    def test_from_ascii_string(self):
 
507
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
508
 
 
509
    def test_from_unicode_string_ascii_contents(self):
 
510
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
511
 
 
512
    def test_from_unicode_deprecated(self):
 
513
        self.assertEqual('bargam',
 
514
            self.callDeprecated([osutils._file_id_warning],
 
515
                                osutils.safe_file_id, u'bargam'))
 
516
 
 
517
    def test_from_unicode_string_unicode_contents(self):
 
518
        self.assertEqual('bargam\xc2\xae',
 
519
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
520
 
 
521
    def test_from_utf8_string(self):
 
522
        self.assertEqual('foo\xc2\xae',
 
523
                         osutils.safe_file_id('foo\xc2\xae'))
 
524
 
 
525
    def test_none(self):
 
526
        """Currently, None is a valid revision_id"""
 
527
        self.assertEqual(None, osutils.safe_file_id(None))
 
528
 
 
529
 
 
530
class TestWin32Funcs(TestCase):
 
531
    """Test that the _win32 versions of os utilities return appropriate paths."""
 
532
 
 
533
    def test_abspath(self):
 
534
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
535
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
536
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
537
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
 
538
 
 
539
    def test_realpath(self):
 
540
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
541
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
542
 
 
543
    def test_pathjoin(self):
 
544
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
545
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
546
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
547
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
548
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
549
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
550
 
 
551
    def test_normpath(self):
 
552
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
553
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
 
554
 
 
555
    def test_getcwd(self):
 
556
        cwd = osutils._win32_getcwd()
 
557
        os_cwd = os.getcwdu()
 
558
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
559
        # win32 is inconsistent whether it returns lower or upper case
 
560
        # and even if it was consistent the user might type the other
 
561
        # so we force it to uppercase
 
562
        # running python.exe under cmd.exe return capital C:\\
 
563
        # running win32 python inside a cygwin shell returns lowercase
 
564
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
565
 
 
566
    def test_fixdrive(self):
 
567
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
568
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
569
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
570
 
 
571
    def test_win98_abspath(self):
 
572
        # absolute path
 
573
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
574
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
575
        # UNC path
 
576
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
577
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
578
        # relative path
 
579
        cwd = osutils.getcwd().rstrip('/')
 
580
        drive = osutils._nt_splitdrive(cwd)[0]
 
581
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
582
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
583
        # unicode path
 
584
        u = u'\u1234'
 
585
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
586
 
 
587
 
 
588
class TestWin32FuncsDirs(TestCaseInTempDir):
 
589
    """Test win32 functions that create files."""
 
590
    
 
591
    def test_getcwd(self):
 
592
        if win32utils.winver == 'Windows 98':
 
593
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
 
594
        # Make sure getcwd can handle unicode filenames
 
595
        try:
 
596
            os.mkdir(u'mu-\xb5')
 
597
        except UnicodeError:
 
598
            raise TestSkipped("Unable to create Unicode filename")
 
599
 
 
600
        os.chdir(u'mu-\xb5')
 
601
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
602
        #       it will change the normalization of B\xe5gfors
 
603
        #       Consider using a different unicode character, or make
 
604
        #       osutils.getcwd() renormalize the path.
 
605
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
606
 
 
607
    def test_minimum_path_selection(self):
 
608
        self.assertEqual(set(),
 
609
            osutils.minimum_path_selection([]))
 
610
        self.assertEqual(set(['a', 'b']),
 
611
            osutils.minimum_path_selection(['a', 'b']))
 
612
        self.assertEqual(set(['a/', 'b']),
 
613
            osutils.minimum_path_selection(['a/', 'b']))
 
614
        self.assertEqual(set(['a/', 'b']),
 
615
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
616
 
 
617
    def test_mkdtemp(self):
 
618
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
619
        self.assertFalse('\\' in tmpdir)
 
620
 
 
621
    def test_rename(self):
 
622
        a = open('a', 'wb')
 
623
        a.write('foo\n')
 
624
        a.close()
 
625
        b = open('b', 'wb')
 
626
        b.write('baz\n')
 
627
        b.close()
 
628
 
 
629
        osutils._win32_rename('b', 'a')
 
630
        self.failUnlessExists('a')
 
631
        self.failIfExists('b')
 
632
        self.assertFileEqual('baz\n', 'a')
 
633
 
 
634
    def test_rename_missing_file(self):
 
635
        a = open('a', 'wb')
 
636
        a.write('foo\n')
 
637
        a.close()
 
638
 
 
639
        try:
 
640
            osutils._win32_rename('b', 'a')
 
641
        except (IOError, OSError), e:
 
642
            self.assertEqual(errno.ENOENT, e.errno)
 
643
        self.assertFileEqual('foo\n', 'a')
 
644
 
 
645
    def test_rename_missing_dir(self):
 
646
        os.mkdir('a')
 
647
        try:
 
648
            osutils._win32_rename('b', 'a')
 
649
        except (IOError, OSError), e:
 
650
            self.assertEqual(errno.ENOENT, e.errno)
 
651
 
 
652
    def test_rename_current_dir(self):
 
653
        os.mkdir('a')
 
654
        os.chdir('a')
 
655
        # You can't rename the working directory
 
656
        # doing rename non-existant . usually
 
657
        # just raises ENOENT, since non-existant
 
658
        # doesn't exist.
 
659
        try:
 
660
            osutils._win32_rename('b', '.')
 
661
        except (IOError, OSError), e:
 
662
            self.assertEqual(errno.ENOENT, e.errno)
 
663
 
 
664
    def test_splitpath(self):
 
665
        def check(expected, path):
 
666
            self.assertEqual(expected, osutils.splitpath(path))
 
667
 
 
668
        check(['a'], 'a')
 
669
        check(['a', 'b'], 'a/b')
 
670
        check(['a', 'b'], 'a/./b')
 
671
        check(['a', '.b'], 'a/.b')
 
672
        check(['a', '.b'], 'a\\.b')
 
673
 
 
674
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
675
 
 
676
 
 
677
class TestMacFuncsDirs(TestCaseInTempDir):
 
678
    """Test mac special functions that require directories."""
 
679
 
 
680
    def test_getcwd(self):
 
681
        # On Mac, this will actually create Ba\u030agfors
 
682
        # but chdir will still work, because it accepts both paths
 
683
        try:
 
684
            os.mkdir(u'B\xe5gfors')
 
685
        except UnicodeError:
 
686
            raise TestSkipped("Unable to create Unicode filename")
 
687
 
 
688
        os.chdir(u'B\xe5gfors')
 
689
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
690
 
 
691
    def test_getcwd_nonnorm(self):
 
692
        # Test that _mac_getcwd() will normalize this path
 
693
        try:
 
694
            os.mkdir(u'Ba\u030agfors')
 
695
        except UnicodeError:
 
696
            raise TestSkipped("Unable to create Unicode filename")
 
697
 
 
698
        os.chdir(u'Ba\u030agfors')
 
699
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
700
 
 
701
 
 
702
class TestSplitLines(TestCase):
 
703
 
 
704
    def test_split_unicode(self):
 
705
        self.assertEqual([u'foo\n', u'bar\xae'],
 
706
                         osutils.split_lines(u'foo\nbar\xae'))
 
707
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
708
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
709
 
 
710
    def test_split_with_carriage_returns(self):
 
711
        self.assertEqual(['foo\rbar\n'],
 
712
                         osutils.split_lines('foo\rbar\n'))
 
713
 
 
714
 
 
715
class TestWalkDirs(TestCaseInTempDir):
 
716
 
 
717
    def test_walkdirs(self):
 
718
        tree = [
 
719
            '.bzr',
 
720
            '0file',
 
721
            '1dir/',
 
722
            '1dir/0file',
 
723
            '1dir/1dir/',
 
724
            '2file'
 
725
            ]
 
726
        self.build_tree(tree)
 
727
        expected_dirblocks = [
 
728
                (('', '.'),
 
729
                 [('0file', '0file', 'file'),
 
730
                  ('1dir', '1dir', 'directory'),
 
731
                  ('2file', '2file', 'file'),
 
732
                 ]
 
733
                ),
 
734
                (('1dir', './1dir'),
 
735
                 [('1dir/0file', '0file', 'file'),
 
736
                  ('1dir/1dir', '1dir', 'directory'),
 
737
                 ]
 
738
                ),
 
739
                (('1dir/1dir', './1dir/1dir'),
 
740
                 [
 
741
                 ]
 
742
                ),
 
743
            ]
 
744
        result = []
 
745
        found_bzrdir = False
 
746
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
747
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
748
                # this tests the filtering of selected paths
 
749
                found_bzrdir = True
 
750
                del dirblock[0]
 
751
            result.append((dirdetail, dirblock))
 
752
 
 
753
        self.assertTrue(found_bzrdir)
 
754
        self.assertEqual(expected_dirblocks,
 
755
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
756
        # you can search a subdir only, with a supplied prefix.
 
757
        result = []
 
758
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
759
            result.append(dirblock)
 
760
        self.assertEqual(expected_dirblocks[1:],
 
761
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
762
 
 
763
    def test__walkdirs_utf8(self):
 
764
        tree = [
 
765
            '.bzr',
 
766
            '0file',
 
767
            '1dir/',
 
768
            '1dir/0file',
 
769
            '1dir/1dir/',
 
770
            '2file'
 
771
            ]
 
772
        self.build_tree(tree)
 
773
        expected_dirblocks = [
 
774
                (('', '.'),
 
775
                 [('0file', '0file', 'file'),
 
776
                  ('1dir', '1dir', 'directory'),
 
777
                  ('2file', '2file', 'file'),
 
778
                 ]
 
779
                ),
 
780
                (('1dir', './1dir'),
 
781
                 [('1dir/0file', '0file', 'file'),
 
782
                  ('1dir/1dir', '1dir', 'directory'),
 
783
                 ]
 
784
                ),
 
785
                (('1dir/1dir', './1dir/1dir'),
 
786
                 [
 
787
                 ]
 
788
                ),
 
789
            ]
 
790
        result = []
 
791
        found_bzrdir = False
 
792
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
793
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
794
                # this tests the filtering of selected paths
 
795
                found_bzrdir = True
 
796
                del dirblock[0]
 
797
            result.append((dirdetail, dirblock))
 
798
 
 
799
        self.assertTrue(found_bzrdir)
 
800
        self.assertEqual(expected_dirblocks,
 
801
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
802
        # you can search a subdir only, with a supplied prefix.
 
803
        result = []
 
804
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
805
            result.append(dirblock)
 
806
        self.assertEqual(expected_dirblocks[1:],
 
807
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
808
 
 
809
    def _filter_out_stat(self, result):
 
810
        """Filter out the stat value from the walkdirs result"""
 
811
        for dirdetail, dirblock in result:
 
812
            new_dirblock = []
 
813
            for info in dirblock:
 
814
                # Ignore info[3] which is the stat
 
815
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
816
            dirblock[:] = new_dirblock
 
817
 
 
818
    def test_unicode_walkdirs(self):
 
819
        """Walkdirs should always return unicode paths."""
 
820
        name0 = u'0file-\xb6'
 
821
        name1 = u'1dir-\u062c\u0648'
 
822
        name2 = u'2file-\u0633'
 
823
        tree = [
 
824
            name0,
 
825
            name1 + '/',
 
826
            name1 + '/' + name0,
 
827
            name1 + '/' + name1 + '/',
 
828
            name2,
 
829
            ]
 
830
        try:
 
831
            self.build_tree(tree)
 
832
        except UnicodeError:
 
833
            raise TestSkipped('Could not represent Unicode chars'
 
834
                              ' in current encoding.')
 
835
        expected_dirblocks = [
 
836
                ((u'', u'.'),
 
837
                 [(name0, name0, 'file', './' + name0),
 
838
                  (name1, name1, 'directory', './' + name1),
 
839
                  (name2, name2, 'file', './' + name2),
 
840
                 ]
 
841
                ),
 
842
                ((name1, './' + name1),
 
843
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
844
                                                        + '/' + name0),
 
845
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
846
                                                            + '/' + name1),
 
847
                 ]
 
848
                ),
 
849
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
850
                 [
 
851
                 ]
 
852
                ),
 
853
            ]
 
854
        result = list(osutils.walkdirs('.'))
 
855
        self._filter_out_stat(result)
 
856
        self.assertEqual(expected_dirblocks, result)
 
857
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
858
        self._filter_out_stat(result)
 
859
        self.assertEqual(expected_dirblocks[1:], result)
 
860
 
 
861
    def test_unicode__walkdirs_utf8(self):
 
862
        """Walkdirs_utf8 should always return utf8 paths.
 
863
 
 
864
        The abspath portion might be in unicode or utf-8
 
865
        """
 
866
        name0 = u'0file-\xb6'
 
867
        name1 = u'1dir-\u062c\u0648'
 
868
        name2 = u'2file-\u0633'
 
869
        tree = [
 
870
            name0,
 
871
            name1 + '/',
 
872
            name1 + '/' + name0,
 
873
            name1 + '/' + name1 + '/',
 
874
            name2,
 
875
            ]
 
876
        try:
 
877
            self.build_tree(tree)
 
878
        except UnicodeError:
 
879
            raise TestSkipped('Could not represent Unicode chars'
 
880
                              ' in current encoding.')
 
881
        name0 = name0.encode('utf8')
 
882
        name1 = name1.encode('utf8')
 
883
        name2 = name2.encode('utf8')
 
884
 
 
885
        expected_dirblocks = [
 
886
                (('', '.'),
 
887
                 [(name0, name0, 'file', './' + name0),
 
888
                  (name1, name1, 'directory', './' + name1),
 
889
                  (name2, name2, 'file', './' + name2),
 
890
                 ]
 
891
                ),
 
892
                ((name1, './' + name1),
 
893
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
894
                                                        + '/' + name0),
 
895
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
896
                                                            + '/' + name1),
 
897
                 ]
 
898
                ),
 
899
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
900
                 [
 
901
                 ]
 
902
                ),
 
903
            ]
 
904
        result = []
 
905
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
906
        # all abspaths are Unicode, and encode them back into utf8.
 
907
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
908
            self.assertIsInstance(dirdetail[0], str)
 
909
            if isinstance(dirdetail[1], unicode):
 
910
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
911
                dirblock = [list(info) for info in dirblock]
 
912
                for info in dirblock:
 
913
                    self.assertIsInstance(info[4], unicode)
 
914
                    info[4] = info[4].encode('utf8')
 
915
            new_dirblock = []
 
916
            for info in dirblock:
 
917
                self.assertIsInstance(info[0], str)
 
918
                self.assertIsInstance(info[1], str)
 
919
                self.assertIsInstance(info[4], str)
 
920
                # Remove the stat information
 
921
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
922
            result.append((dirdetail, new_dirblock))
 
923
        self.assertEqual(expected_dirblocks, result)
 
924
 
 
925
    def test_unicode__walkdirs_unicode_to_utf8(self):
 
926
        """walkdirs_unicode_to_utf8 should be a safe fallback everywhere
 
927
 
 
928
        The abspath portion should be in unicode
 
929
        """
 
930
        name0u = u'0file-\xb6'
 
931
        name1u = u'1dir-\u062c\u0648'
 
932
        name2u = u'2file-\u0633'
 
933
        tree = [
 
934
            name0u,
 
935
            name1u + '/',
 
936
            name1u + '/' + name0u,
 
937
            name1u + '/' + name1u + '/',
 
938
            name2u,
 
939
            ]
 
940
        try:
 
941
            self.build_tree(tree)
 
942
        except UnicodeError:
 
943
            raise TestSkipped('Could not represent Unicode chars'
 
944
                              ' in current encoding.')
 
945
        name0 = name0u.encode('utf8')
 
946
        name1 = name1u.encode('utf8')
 
947
        name2 = name2u.encode('utf8')
 
948
 
 
949
        # All of the abspaths should be in unicode, all of the relative paths
 
950
        # should be in utf8
 
951
        expected_dirblocks = [
 
952
                (('', '.'),
 
953
                 [(name0, name0, 'file', './' + name0u),
 
954
                  (name1, name1, 'directory', './' + name1u),
 
955
                  (name2, name2, 'file', './' + name2u),
 
956
                 ]
 
957
                ),
 
958
                ((name1, './' + name1u),
 
959
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
960
                                                        + '/' + name0u),
 
961
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
962
                                                            + '/' + name1u),
 
963
                 ]
 
964
                ),
 
965
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
966
                 [
 
967
                 ]
 
968
                ),
 
969
            ]
 
970
        result = list(osutils._walkdirs_unicode_to_utf8('.'))
 
971
        self._filter_out_stat(result)
 
972
        self.assertEqual(expected_dirblocks, result)
 
973
 
 
974
    def assertPathCompare(self, path_less, path_greater):
 
975
        """check that path_less and path_greater compare correctly."""
 
976
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
977
            path_less, path_less))
 
978
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
979
            path_greater, path_greater))
 
980
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
981
            path_less, path_greater))
 
982
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
983
            path_greater, path_less))
 
984
 
 
985
    def test_compare_paths_prefix_order(self):
 
986
        # root before all else
 
987
        self.assertPathCompare("/", "/a")
 
988
        # alpha within a dir
 
989
        self.assertPathCompare("/a", "/b")
 
990
        self.assertPathCompare("/b", "/z")
 
991
        # high dirs before lower.
 
992
        self.assertPathCompare("/z", "/a/a")
 
993
        # except if the deeper dir should be output first
 
994
        self.assertPathCompare("/a/b/c", "/d/g")
 
995
        # lexical betwen dirs of the same height
 
996
        self.assertPathCompare("/a/z", "/z/z")
 
997
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
998
 
 
999
        # this should also be consistent for no leading / paths
 
1000
        # root before all else
 
1001
        self.assertPathCompare("", "a")
 
1002
        # alpha within a dir
 
1003
        self.assertPathCompare("a", "b")
 
1004
        self.assertPathCompare("b", "z")
 
1005
        # high dirs before lower.
 
1006
        self.assertPathCompare("z", "a/a")
 
1007
        # except if the deeper dir should be output first
 
1008
        self.assertPathCompare("a/b/c", "d/g")
 
1009
        # lexical betwen dirs of the same height
 
1010
        self.assertPathCompare("a/z", "z/z")
 
1011
        self.assertPathCompare("a/c/z", "a/d/e")
 
1012
 
 
1013
    def test_path_prefix_sorting(self):
 
1014
        """Doing a sort on path prefix should match our sample data."""
 
1015
        original_paths = [
 
1016
            'a',
 
1017
            'a/b',
 
1018
            'a/b/c',
 
1019
            'b',
 
1020
            'b/c',
 
1021
            'd',
 
1022
            'd/e',
 
1023
            'd/e/f',
 
1024
            'd/f',
 
1025
            'd/g',
 
1026
            'g',
 
1027
            ]
 
1028
 
 
1029
        dir_sorted_paths = [
 
1030
            'a',
 
1031
            'b',
 
1032
            'd',
 
1033
            'g',
 
1034
            'a/b',
 
1035
            'a/b/c',
 
1036
            'b/c',
 
1037
            'd/e',
 
1038
            'd/f',
 
1039
            'd/g',
 
1040
            'd/e/f',
 
1041
            ]
 
1042
 
 
1043
        self.assertEqual(
 
1044
            dir_sorted_paths,
 
1045
            sorted(original_paths, key=osutils.path_prefix_key))
 
1046
        # using the comparison routine shoudl work too:
 
1047
        self.assertEqual(
 
1048
            dir_sorted_paths,
 
1049
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1050
 
 
1051
 
 
1052
class TestCopyTree(TestCaseInTempDir):
 
1053
    
 
1054
    def test_copy_basic_tree(self):
 
1055
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1056
        osutils.copy_tree('source', 'target')
 
1057
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1058
        self.assertEqual(['c'], os.listdir('target/b'))
 
1059
 
 
1060
    def test_copy_tree_target_exists(self):
 
1061
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1062
                         'target/'])
 
1063
        osutils.copy_tree('source', 'target')
 
1064
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1065
        self.assertEqual(['c'], os.listdir('target/b'))
 
1066
 
 
1067
    def test_copy_tree_symlinks(self):
 
1068
        self.requireFeature(SymlinkFeature)
 
1069
        self.build_tree(['source/'])
 
1070
        os.symlink('a/generic/path', 'source/lnk')
 
1071
        osutils.copy_tree('source', 'target')
 
1072
        self.assertEqual(['lnk'], os.listdir('target'))
 
1073
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1074
 
 
1075
    def test_copy_tree_handlers(self):
 
1076
        processed_files = []
 
1077
        processed_links = []
 
1078
        def file_handler(from_path, to_path):
 
1079
            processed_files.append(('f', from_path, to_path))
 
1080
        def dir_handler(from_path, to_path):
 
1081
            processed_files.append(('d', from_path, to_path))
 
1082
        def link_handler(from_path, to_path):
 
1083
            processed_links.append((from_path, to_path))
 
1084
        handlers = {'file':file_handler,
 
1085
                    'directory':dir_handler,
 
1086
                    'symlink':link_handler,
 
1087
                   }
 
1088
 
 
1089
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1090
        if osutils.has_symlinks():
 
1091
            os.symlink('a/generic/path', 'source/lnk')
 
1092
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1093
 
 
1094
        self.assertEqual([('d', 'source', 'target'),
 
1095
                          ('f', 'source/a', 'target/a'),
 
1096
                          ('d', 'source/b', 'target/b'),
 
1097
                          ('f', 'source/b/c', 'target/b/c'),
 
1098
                         ], processed_files)
 
1099
        self.failIfExists('target')
 
1100
        if osutils.has_symlinks():
 
1101
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1102
 
 
1103
 
 
1104
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
 
1105
# [bialix] 2006/12/26
 
1106
 
 
1107
 
 
1108
class TestSetUnsetEnv(TestCase):
 
1109
    """Test updating the environment"""
 
1110
 
 
1111
    def setUp(self):
 
1112
        super(TestSetUnsetEnv, self).setUp()
 
1113
 
 
1114
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1115
                         'Environment was not cleaned up properly.'
 
1116
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1117
        def cleanup():
 
1118
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1119
                del os.environ['BZR_TEST_ENV_VAR']
 
1120
 
 
1121
        self.addCleanup(cleanup)
 
1122
 
 
1123
    def test_set(self):
 
1124
        """Test that we can set an env variable"""
 
1125
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1126
        self.assertEqual(None, old)
 
1127
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1128
 
 
1129
    def test_double_set(self):
 
1130
        """Test that we get the old value out"""
 
1131
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1132
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1133
        self.assertEqual('foo', old)
 
1134
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1135
 
 
1136
    def test_unicode(self):
 
1137
        """Environment can only contain plain strings
 
1138
        
 
1139
        So Unicode strings must be encoded.
 
1140
        """
 
1141
        uni_val, env_val = probe_unicode_in_user_encoding()
 
1142
        if uni_val is None:
 
1143
            raise TestSkipped('Cannot find a unicode character that works in'
 
1144
                              ' encoding %s' % (bzrlib.user_encoding,))
 
1145
 
 
1146
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1147
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1148
 
 
1149
    def test_unset(self):
 
1150
        """Test that passing None will remove the env var"""
 
1151
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1152
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1153
        self.assertEqual('foo', old)
 
1154
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1155
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1156
 
 
1157
 
 
1158
class TestLocalTimeOffset(TestCase):
 
1159
 
 
1160
    def test_local_time_offset(self):
 
1161
        """Test that local_time_offset() returns a sane value."""
 
1162
        offset = osutils.local_time_offset()
 
1163
        self.assertTrue(isinstance(offset, int))
 
1164
        # Test that the offset is no more than a eighteen hours in
 
1165
        # either direction.
 
1166
        # Time zone handling is system specific, so it is difficult to
 
1167
        # do more specific tests, but a value outside of this range is
 
1168
        # probably wrong.
 
1169
        eighteen_hours = 18 * 3600
 
1170
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1171
 
 
1172
    def test_local_time_offset_with_timestamp(self):
 
1173
        """Test that local_time_offset() works with a timestamp."""
 
1174
        offset = osutils.local_time_offset(1000000000.1234567)
 
1175
        self.assertTrue(isinstance(offset, int))
 
1176
        eighteen_hours = 18 * 3600
 
1177
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1178
 
 
1179
 
 
1180
class TestShaFileByName(TestCaseInTempDir):
 
1181
 
 
1182
    def test_sha_empty(self):
 
1183
        self.build_tree_contents([('foo', '')])
 
1184
        expected_sha = osutils.sha_string('')
 
1185
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1186
 
 
1187
    def test_sha_mixed_endings(self):
 
1188
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1189
        self.build_tree_contents([('foo', text)])
 
1190
        expected_sha = osutils.sha_string(text)
 
1191
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1192
 
 
1193
 
 
1194
_debug_text = \
 
1195
r'''# Copyright (C) 2005, 2006 Canonical Ltd
 
1196
#
 
1197
# This program is free software; you can redistribute it and/or modify
 
1198
# it under the terms of the GNU General Public License as published by
 
1199
# the Free Software Foundation; either version 2 of the License, or
 
1200
# (at your option) any later version.
 
1201
#
 
1202
# This program is distributed in the hope that it will be useful,
 
1203
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
1204
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
1205
# GNU General Public License for more details.
 
1206
#
 
1207
# You should have received a copy of the GNU General Public License
 
1208
# along with this program; if not, write to the Free Software
 
1209
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
1210
 
 
1211
 
 
1212
# NOTE: If update these, please also update the help for global-options in
 
1213
#       bzrlib/help_topics/__init__.py
 
1214
 
 
1215
debug_flags = set()
 
1216
"""Set of flags that enable different debug behaviour.
 
1217
 
 
1218
These are set with eg ``-Dlock`` on the bzr command line.
 
1219
 
 
1220
Options include:
 
1221
 
 
1222
 * auth - show authentication sections used
 
1223
 * error - show stack traces for all top level exceptions
 
1224
 * evil - capture call sites that do expensive or badly-scaling operations.
 
1225
 * fetch - trace history copying between repositories
 
1226
 * graph - trace graph traversal information
 
1227
 * hashcache - log every time a working file is read to determine its hash
 
1228
 * hooks - trace hook execution
 
1229
 * hpss - trace smart protocol requests and responses
 
1230
 * http - trace http connections, requests and responses
 
1231
 * index - trace major index operations
 
1232
 * knit - trace knit operations
 
1233
 * lock - trace when lockdir locks are taken or released
 
1234
 * merge - emit information for debugging merges
 
1235
 * pack - emit information about pack operations
 
1236
 
 
1237
"""
 
1238
'''
 
1239
 
 
1240
 
 
1241
class TestResourceLoading(TestCaseInTempDir):
 
1242
 
 
1243
    def test_resource_string(self):
 
1244
        # test resource in bzrlib
 
1245
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1246
        self.assertEquals(_debug_text, text)
 
1247
        # test resource under bzrlib
 
1248
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1249
        self.assertContainsRe(text, "class TextUIFactory")
 
1250
        # test unsupported package
 
1251
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1252
            'yyy.xx')
 
1253
        # test unknown resource
 
1254
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')