~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_osutils.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-06-20 01:09:18 UTC
  • mfrom: (3505.1.1 ianc-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20080620010918-64z4xylh1ap5hgyf
Accept user names with @s in URLs (Neil Martinsen-Burrell)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the osutils wrapper."""
 
18
 
 
19
import errno
 
20
import os
 
21
import socket
 
22
import stat
 
23
import sys
 
24
 
 
25
import bzrlib
 
26
from bzrlib import (
 
27
    errors,
 
28
    osutils,
 
29
    win32utils,
 
30
    )
 
31
from bzrlib.errors import BzrBadParameterNotUnicode, InvalidURL
 
32
from bzrlib.osutils import (
 
33
        is_inside_any,
 
34
        is_inside_or_parent_of_any,
 
35
        pathjoin,
 
36
        pumpfile,
 
37
        )
 
38
from bzrlib.tests import (
 
39
        probe_unicode_in_user_encoding,
 
40
        StringIOWrapper,
 
41
        SymlinkFeature,
 
42
        TestCase,
 
43
        TestCaseInTempDir,
 
44
        TestSkipped,
 
45
        )
 
46
from bzrlib.tests.file_utils import (
 
47
    FakeReadFile,
 
48
    )
 
49
from cStringIO import StringIO
 
50
 
 
51
class TestOSUtils(TestCaseInTempDir):
 
52
 
 
53
    def test_contains_whitespace(self):
 
54
        self.failUnless(osutils.contains_whitespace(u' '))
 
55
        self.failUnless(osutils.contains_whitespace(u'hello there'))
 
56
        self.failUnless(osutils.contains_whitespace(u'hellothere\n'))
 
57
        self.failUnless(osutils.contains_whitespace(u'hello\nthere'))
 
58
        self.failUnless(osutils.contains_whitespace(u'hello\rthere'))
 
59
        self.failUnless(osutils.contains_whitespace(u'hello\tthere'))
 
60
 
 
61
        # \xa0 is "Non-breaking-space" which on some python locales thinks it
 
62
        # is whitespace, but we do not.
 
63
        self.failIf(osutils.contains_whitespace(u''))
 
64
        self.failIf(osutils.contains_whitespace(u'hellothere'))
 
65
        self.failIf(osutils.contains_whitespace(u'hello\xa0there'))
 
66
 
 
67
    def test_fancy_rename(self):
 
68
        # This should work everywhere
 
69
        def rename(a, b):
 
70
            osutils.fancy_rename(a, b,
 
71
                    rename_func=os.rename,
 
72
                    unlink_func=os.unlink)
 
73
 
 
74
        open('a', 'wb').write('something in a\n')
 
75
        rename('a', 'b')
 
76
        self.failIfExists('a')
 
77
        self.failUnlessExists('b')
 
78
        self.check_file_contents('b', 'something in a\n')
 
79
 
 
80
        open('a', 'wb').write('new something in a\n')
 
81
        rename('b', 'a')
 
82
 
 
83
        self.check_file_contents('a', 'something in a\n')
 
84
 
 
85
    def test_rename(self):
 
86
        # Rename should be semi-atomic on all platforms
 
87
        open('a', 'wb').write('something in a\n')
 
88
        osutils.rename('a', 'b')
 
89
        self.failIfExists('a')
 
90
        self.failUnlessExists('b')
 
91
        self.check_file_contents('b', 'something in a\n')
 
92
 
 
93
        open('a', 'wb').write('new something in a\n')
 
94
        osutils.rename('b', 'a')
 
95
 
 
96
        self.check_file_contents('a', 'something in a\n')
 
97
 
 
98
    # TODO: test fancy_rename using a MemoryTransport
 
99
 
 
100
    def test_rename_change_case(self):
 
101
        # on Windows we should be able to change filename case by rename
 
102
        self.build_tree(['a', 'b/'])
 
103
        osutils.rename('a', 'A')
 
104
        osutils.rename('b', 'B')
 
105
        # we can't use failUnlessExists on case-insensitive filesystem
 
106
        # so try to check shape of the tree
 
107
        shape = sorted(os.listdir('.'))
 
108
        self.assertEquals(['A', 'B'], shape)
 
109
 
 
110
    def test_01_rand_chars_empty(self):
 
111
        result = osutils.rand_chars(0)
 
112
        self.assertEqual(result, '')
 
113
 
 
114
    def test_02_rand_chars_100(self):
 
115
        result = osutils.rand_chars(100)
 
116
        self.assertEqual(len(result), 100)
 
117
        self.assertEqual(type(result), str)
 
118
        self.assertContainsRe(result, r'^[a-z0-9]{100}$')
 
119
 
 
120
    def test_is_inside(self):
 
121
        is_inside = osutils.is_inside
 
122
        self.assertTrue(is_inside('src', 'src/foo.c'))
 
123
        self.assertFalse(is_inside('src', 'srccontrol'))
 
124
        self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
 
125
        self.assertTrue(is_inside('foo.c', 'foo.c'))
 
126
        self.assertFalse(is_inside('foo.c', ''))
 
127
        self.assertTrue(is_inside('', 'foo.c'))
 
128
 
 
129
    def test_is_inside_any(self):
 
130
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
131
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
132
                         (['src'], SRC_FOO_C),
 
133
                         (['src'], 'src'),
 
134
                         ]:
 
135
            self.assert_(is_inside_any(dirs, fn))
 
136
        for dirs, fn in [(['src'], 'srccontrol'),
 
137
                         (['src'], 'srccontrol/foo')]:
 
138
            self.assertFalse(is_inside_any(dirs, fn))
 
139
 
 
140
    def test_is_inside_or_parent_of_any(self):
 
141
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
142
                         (['src'], 'src/foo.c'),
 
143
                         (['src/bar.c'], 'src'),
 
144
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
145
                         (['src'], 'src'),
 
146
                         ]:
 
147
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
148
            
 
149
        for dirs, fn in [(['src'], 'srccontrol'),
 
150
                         (['srccontrol/foo.c'], 'src'),
 
151
                         (['src'], 'srccontrol/foo')]:
 
152
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
 
153
 
 
154
    def test_rmtree(self):
 
155
        # Check to remove tree with read-only files/dirs
 
156
        os.mkdir('dir')
 
157
        f = file('dir/file', 'w')
 
158
        f.write('spam')
 
159
        f.close()
 
160
        # would like to also try making the directory readonly, but at the
 
161
        # moment python shutil.rmtree doesn't handle that properly - it would
 
162
        # need to chmod the directory before removing things inside it - deferred
 
163
        # for now -- mbp 20060505
 
164
        # osutils.make_readonly('dir')
 
165
        osutils.make_readonly('dir/file')
 
166
 
 
167
        osutils.rmtree('dir')
 
168
 
 
169
        self.failIfExists('dir/file')
 
170
        self.failIfExists('dir')
 
171
 
 
172
    def test_file_kind(self):
 
173
        self.build_tree(['file', 'dir/'])
 
174
        self.assertEquals('file', osutils.file_kind('file'))
 
175
        self.assertEquals('directory', osutils.file_kind('dir/'))
 
176
        if osutils.has_symlinks():
 
177
            os.symlink('symlink', 'symlink')
 
178
            self.assertEquals('symlink', osutils.file_kind('symlink'))
 
179
        
 
180
        # TODO: jam 20060529 Test a block device
 
181
        try:
 
182
            os.lstat('/dev/null')
 
183
        except OSError, e:
 
184
            if e.errno not in (errno.ENOENT,):
 
185
                raise
 
186
        else:
 
187
            self.assertEquals('chardev', osutils.file_kind('/dev/null'))
 
188
 
 
189
        mkfifo = getattr(os, 'mkfifo', None)
 
190
        if mkfifo:
 
191
            mkfifo('fifo')
 
192
            try:
 
193
                self.assertEquals('fifo', osutils.file_kind('fifo'))
 
194
            finally:
 
195
                os.remove('fifo')
 
196
 
 
197
        AF_UNIX = getattr(socket, 'AF_UNIX', None)
 
198
        if AF_UNIX:
 
199
            s = socket.socket(AF_UNIX)
 
200
            s.bind('socket')
 
201
            try:
 
202
                self.assertEquals('socket', osutils.file_kind('socket'))
 
203
            finally:
 
204
                os.remove('socket')
 
205
 
 
206
    def test_kind_marker(self):
 
207
        self.assertEqual(osutils.kind_marker('file'), '')
 
208
        self.assertEqual(osutils.kind_marker('directory'), '/')
 
209
        self.assertEqual(osutils.kind_marker('symlink'), '@')
 
210
        self.assertEqual(osutils.kind_marker('tree-reference'), '+')
 
211
 
 
212
    def test_get_umask(self):
 
213
        if sys.platform == 'win32':
 
214
            # umask always returns '0', no way to set it
 
215
            self.assertEqual(0, osutils.get_umask())
 
216
            return
 
217
 
 
218
        orig_umask = osutils.get_umask()
 
219
        try:
 
220
            os.umask(0222)
 
221
            self.assertEqual(0222, osutils.get_umask())
 
222
            os.umask(0022)
 
223
            self.assertEqual(0022, osutils.get_umask())
 
224
            os.umask(0002)
 
225
            self.assertEqual(0002, osutils.get_umask())
 
226
            os.umask(0027)
 
227
            self.assertEqual(0027, osutils.get_umask())
 
228
        finally:
 
229
            os.umask(orig_umask)
 
230
 
 
231
    def assertFormatedDelta(self, expected, seconds):
 
232
        """Assert osutils.format_delta formats as expected"""
 
233
        actual = osutils.format_delta(seconds)
 
234
        self.assertEqual(expected, actual)
 
235
 
 
236
    def test_format_delta(self):
 
237
        self.assertFormatedDelta('0 seconds ago', 0)
 
238
        self.assertFormatedDelta('1 second ago', 1)
 
239
        self.assertFormatedDelta('10 seconds ago', 10)
 
240
        self.assertFormatedDelta('59 seconds ago', 59)
 
241
        self.assertFormatedDelta('89 seconds ago', 89)
 
242
        self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
 
243
        self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
 
244
        self.assertFormatedDelta('3 minutes, 1 second ago', 181)
 
245
        self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
 
246
        self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
 
247
        self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
 
248
        self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
 
249
        self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
 
250
        self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
 
251
        self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
 
252
        self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
 
253
        self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
 
254
        self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
 
255
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
 
256
        self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
 
257
        self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
 
258
        self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
 
259
        self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
 
260
 
 
261
        # We handle when time steps the wrong direction because computers
 
262
        # don't have synchronized clocks.
 
263
        self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
 
264
        self.assertFormatedDelta('1 second in the future', -1)
 
265
        self.assertFormatedDelta('2 seconds in the future', -2)
 
266
 
 
267
    def test_format_date(self):
 
268
        self.assertRaises(errors.UnsupportedTimezoneFormat,
 
269
            osutils.format_date, 0, timezone='foo')
 
270
 
 
271
    def test_dereference_path(self):
 
272
        self.requireFeature(SymlinkFeature)
 
273
        cwd = osutils.realpath('.')
 
274
        os.mkdir('bar')
 
275
        bar_path = osutils.pathjoin(cwd, 'bar')
 
276
        # Using './' to avoid bug #1213894 (first path component not
 
277
        # dereferenced) in Python 2.4.1 and earlier
 
278
        self.assertEqual(bar_path, osutils.realpath('./bar'))
 
279
        os.symlink('bar', 'foo')
 
280
        self.assertEqual(bar_path, osutils.realpath('./foo'))
 
281
        
 
282
        # Does not dereference terminal symlinks
 
283
        foo_path = osutils.pathjoin(cwd, 'foo')
 
284
        self.assertEqual(foo_path, osutils.dereference_path('./foo'))
 
285
 
 
286
        # Dereferences parent symlinks
 
287
        os.mkdir('bar/baz')
 
288
        baz_path = osutils.pathjoin(bar_path, 'baz')
 
289
        self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
 
290
 
 
291
        # Dereferences parent symlinks that are the first path element
 
292
        self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
 
293
 
 
294
        # Dereferences parent symlinks in absolute paths
 
295
        foo_baz_path = osutils.pathjoin(foo_path, 'baz')
 
296
        self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
 
297
 
 
298
    def test_changing_access(self):
 
299
        f = file('file', 'w')
 
300
        f.write('monkey')
 
301
        f.close()
 
302
 
 
303
        # Make a file readonly
 
304
        osutils.make_readonly('file')
 
305
        mode = os.lstat('file').st_mode
 
306
        self.assertEqual(mode, mode & 0777555)
 
307
 
 
308
        # Make a file writable
 
309
        osutils.make_writable('file')
 
310
        mode = os.lstat('file').st_mode
 
311
        self.assertEqual(mode, mode | 0200)
 
312
 
 
313
        if osutils.has_symlinks():
 
314
            # should not error when handed a symlink
 
315
            os.symlink('nonexistent', 'dangling')
 
316
            osutils.make_readonly('dangling')
 
317
            osutils.make_writable('dangling')
 
318
 
 
319
    def test_kind_marker(self):
 
320
        self.assertEqual("", osutils.kind_marker("file"))
 
321
        self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
 
322
        self.assertEqual("@", osutils.kind_marker("symlink"))
 
323
        self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
 
324
 
 
325
    def test_host_os_dereferences_symlinks(self):
 
326
        osutils.host_os_dereferences_symlinks()
 
327
 
 
328
 
 
329
class TestPumpFile(TestCase):
 
330
    """Test pumpfile method."""
 
331
    def setUp(self):
 
332
        # create a test datablock
 
333
        self.block_size = 512
 
334
        pattern = '0123456789ABCDEF'
 
335
        self.test_data = pattern * (3 * self.block_size / len(pattern))
 
336
        self.test_data_len = len(self.test_data)
 
337
 
 
338
    def test_bracket_block_size(self):
 
339
        """Read data in blocks with the requested read size bracketing the
 
340
        block size."""
 
341
        # make sure test data is larger than max read size
 
342
        self.assertTrue(self.test_data_len > self.block_size)
 
343
 
 
344
        from_file = FakeReadFile(self.test_data)
 
345
        to_file = StringIO()
 
346
 
 
347
        # read (max / 2) bytes and verify read size wasn't affected
 
348
        num_bytes_to_read = self.block_size / 2
 
349
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
350
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
351
        self.assertEqual(from_file.get_read_count(), 1)
 
352
 
 
353
        # read (max) bytes and verify read size wasn't affected
 
354
        num_bytes_to_read = self.block_size
 
355
        from_file.reset_read_count()
 
356
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
357
        self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
 
358
        self.assertEqual(from_file.get_read_count(), 1)
 
359
 
 
360
        # read (max + 1) bytes and verify read size was limited
 
361
        num_bytes_to_read = self.block_size + 1
 
362
        from_file.reset_read_count()
 
363
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
364
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
365
        self.assertEqual(from_file.get_read_count(), 2)
 
366
 
 
367
        # finish reading the rest of the data
 
368
        num_bytes_to_read = self.test_data_len - to_file.tell()
 
369
        pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
 
370
 
 
371
        # report error if the data wasn't equal (we only report the size due
 
372
        # to the length of the data)
 
373
        response_data = to_file.getvalue()
 
374
        if response_data != self.test_data:
 
375
            message = "Data not equal.  Expected %d bytes, received %d."
 
376
            self.fail(message % (len(response_data), self.test_data_len))
 
377
 
 
378
    def test_specified_size(self):
 
379
        """Request a transfer larger than the maximum block size and verify
 
380
        that the maximum read doesn't exceed the block_size."""
 
381
        # make sure test data is larger than max read size
 
382
        self.assertTrue(self.test_data_len > self.block_size)
 
383
 
 
384
        # retrieve data in blocks
 
385
        from_file = FakeReadFile(self.test_data)
 
386
        to_file = StringIO()
 
387
        pumpfile(from_file, to_file, self.test_data_len, self.block_size)
 
388
 
 
389
        # verify read size was equal to the maximum read size
 
390
        self.assertTrue(from_file.get_max_read_size() > 0)
 
391
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
392
        self.assertEqual(from_file.get_read_count(), 3)
 
393
 
 
394
        # report error if the data wasn't equal (we only report the size due
 
395
        # to the length of the data)
 
396
        response_data = to_file.getvalue()
 
397
        if response_data != self.test_data:
 
398
            message = "Data not equal.  Expected %d bytes, received %d."
 
399
            self.fail(message % (len(response_data), self.test_data_len))
 
400
 
 
401
    def test_to_eof(self):
 
402
        """Read to end-of-file and verify that the reads are not larger than
 
403
        the maximum read size."""
 
404
        # make sure test data is larger than max read size
 
405
        self.assertTrue(self.test_data_len > self.block_size)
 
406
 
 
407
        # retrieve data to EOF
 
408
        from_file = FakeReadFile(self.test_data)
 
409
        to_file = StringIO()
 
410
        pumpfile(from_file, to_file, -1, self.block_size)
 
411
 
 
412
        # verify read size was equal to the maximum read size
 
413
        self.assertEqual(from_file.get_max_read_size(), self.block_size)
 
414
        self.assertEqual(from_file.get_read_count(), 4)
 
415
 
 
416
        # report error if the data wasn't equal (we only report the size due
 
417
        # to the length of the data)
 
418
        response_data = to_file.getvalue()
 
419
        if response_data != self.test_data:
 
420
            message = "Data not equal.  Expected %d bytes, received %d."
 
421
            self.fail(message % (len(response_data), self.test_data_len))
 
422
 
 
423
    def test_defaults(self):
 
424
        """Verifies that the default arguments will read to EOF -- this
 
425
        test verifies that any existing usages of pumpfile will not be broken
 
426
        with this new version."""
 
427
        # retrieve data using default (old) pumpfile method
 
428
        from_file = FakeReadFile(self.test_data)
 
429
        to_file = StringIO()
 
430
        pumpfile(from_file, to_file)
 
431
 
 
432
        # report error if the data wasn't equal (we only report the size due
 
433
        # to the length of the data)
 
434
        response_data = to_file.getvalue()
 
435
        if response_data != self.test_data:
 
436
            message = "Data not equal.  Expected %d bytes, received %d."
 
437
            self.fail(message % (len(response_data), self.test_data_len))
 
438
 
 
439
class TestSafeUnicode(TestCase):
 
440
 
 
441
    def test_from_ascii_string(self):
 
442
        self.assertEqual(u'foobar', osutils.safe_unicode('foobar'))
 
443
 
 
444
    def test_from_unicode_string_ascii_contents(self):
 
445
        self.assertEqual(u'bargam', osutils.safe_unicode(u'bargam'))
 
446
 
 
447
    def test_from_unicode_string_unicode_contents(self):
 
448
        self.assertEqual(u'bargam\xae', osutils.safe_unicode(u'bargam\xae'))
 
449
 
 
450
    def test_from_utf8_string(self):
 
451
        self.assertEqual(u'foo\xae', osutils.safe_unicode('foo\xc2\xae'))
 
452
 
 
453
    def test_bad_utf8_string(self):
 
454
        self.assertRaises(BzrBadParameterNotUnicode,
 
455
                          osutils.safe_unicode,
 
456
                          '\xbb\xbb')
 
457
 
 
458
 
 
459
class TestSafeUtf8(TestCase):
 
460
 
 
461
    def test_from_ascii_string(self):
 
462
        f = 'foobar'
 
463
        self.assertEqual('foobar', osutils.safe_utf8(f))
 
464
 
 
465
    def test_from_unicode_string_ascii_contents(self):
 
466
        self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
 
467
 
 
468
    def test_from_unicode_string_unicode_contents(self):
 
469
        self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
 
470
 
 
471
    def test_from_utf8_string(self):
 
472
        self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
 
473
 
 
474
    def test_bad_utf8_string(self):
 
475
        self.assertRaises(BzrBadParameterNotUnicode,
 
476
                          osutils.safe_utf8, '\xbb\xbb')
 
477
 
 
478
 
 
479
class TestSafeRevisionId(TestCase):
 
480
 
 
481
    def test_from_ascii_string(self):
 
482
        # this shouldn't give a warning because it's getting an ascii string
 
483
        self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
 
484
 
 
485
    def test_from_unicode_string_ascii_contents(self):
 
486
        self.assertEqual('bargam',
 
487
                         osutils.safe_revision_id(u'bargam', warn=False))
 
488
 
 
489
    def test_from_unicode_deprecated(self):
 
490
        self.assertEqual('bargam',
 
491
            self.callDeprecated([osutils._revision_id_warning],
 
492
                                osutils.safe_revision_id, u'bargam'))
 
493
 
 
494
    def test_from_unicode_string_unicode_contents(self):
 
495
        self.assertEqual('bargam\xc2\xae',
 
496
                         osutils.safe_revision_id(u'bargam\xae', warn=False))
 
497
 
 
498
    def test_from_utf8_string(self):
 
499
        self.assertEqual('foo\xc2\xae',
 
500
                         osutils.safe_revision_id('foo\xc2\xae'))
 
501
 
 
502
    def test_none(self):
 
503
        """Currently, None is a valid revision_id"""
 
504
        self.assertEqual(None, osutils.safe_revision_id(None))
 
505
 
 
506
 
 
507
class TestSafeFileId(TestCase):
 
508
 
 
509
    def test_from_ascii_string(self):
 
510
        self.assertEqual('foobar', osutils.safe_file_id('foobar'))
 
511
 
 
512
    def test_from_unicode_string_ascii_contents(self):
 
513
        self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
 
514
 
 
515
    def test_from_unicode_deprecated(self):
 
516
        self.assertEqual('bargam',
 
517
            self.callDeprecated([osutils._file_id_warning],
 
518
                                osutils.safe_file_id, u'bargam'))
 
519
 
 
520
    def test_from_unicode_string_unicode_contents(self):
 
521
        self.assertEqual('bargam\xc2\xae',
 
522
                         osutils.safe_file_id(u'bargam\xae', warn=False))
 
523
 
 
524
    def test_from_utf8_string(self):
 
525
        self.assertEqual('foo\xc2\xae',
 
526
                         osutils.safe_file_id('foo\xc2\xae'))
 
527
 
 
528
    def test_none(self):
 
529
        """Currently, None is a valid revision_id"""
 
530
        self.assertEqual(None, osutils.safe_file_id(None))
 
531
 
 
532
 
 
533
class TestWin32Funcs(TestCase):
 
534
    """Test that the _win32 versions of os utilities return appropriate paths."""
 
535
 
 
536
    def test_abspath(self):
 
537
        self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
 
538
        self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
 
539
        self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
 
540
        self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
 
541
 
 
542
    def test_realpath(self):
 
543
        self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
 
544
        self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
 
545
 
 
546
    def test_pathjoin(self):
 
547
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
 
548
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
 
549
        self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
 
550
        self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
 
551
        self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
 
552
        self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
 
553
 
 
554
    def test_normpath(self):
 
555
        self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
 
556
        self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
 
557
 
 
558
    def test_getcwd(self):
 
559
        cwd = osutils._win32_getcwd()
 
560
        os_cwd = os.getcwdu()
 
561
        self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
 
562
        # win32 is inconsistent whether it returns lower or upper case
 
563
        # and even if it was consistent the user might type the other
 
564
        # so we force it to uppercase
 
565
        # running python.exe under cmd.exe return capital C:\\
 
566
        # running win32 python inside a cygwin shell returns lowercase
 
567
        self.assertEqual(os_cwd[0].upper(), cwd[0])
 
568
 
 
569
    def test_fixdrive(self):
 
570
        self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
 
571
        self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
 
572
        self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
 
573
 
 
574
    def test_win98_abspath(self):
 
575
        # absolute path
 
576
        self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
 
577
        self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
 
578
        # UNC path
 
579
        self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
 
580
        self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
 
581
        # relative path
 
582
        cwd = osutils.getcwd().rstrip('/')
 
583
        drive = osutils._nt_splitdrive(cwd)[0]
 
584
        self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
 
585
        self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
 
586
        # unicode path
 
587
        u = u'\u1234'
 
588
        self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
 
589
 
 
590
 
 
591
class TestWin32FuncsDirs(TestCaseInTempDir):
 
592
    """Test win32 functions that create files."""
 
593
    
 
594
    def test_getcwd(self):
 
595
        if win32utils.winver == 'Windows 98':
 
596
            raise TestSkipped('Windows 98 cannot handle unicode filenames')
 
597
        # Make sure getcwd can handle unicode filenames
 
598
        try:
 
599
            os.mkdir(u'mu-\xb5')
 
600
        except UnicodeError:
 
601
            raise TestSkipped("Unable to create Unicode filename")
 
602
 
 
603
        os.chdir(u'mu-\xb5')
 
604
        # TODO: jam 20060427 This will probably fail on Mac OSX because
 
605
        #       it will change the normalization of B\xe5gfors
 
606
        #       Consider using a different unicode character, or make
 
607
        #       osutils.getcwd() renormalize the path.
 
608
        self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
 
609
 
 
610
    def test_minimum_path_selection(self):
 
611
        self.assertEqual(set(),
 
612
            osutils.minimum_path_selection([]))
 
613
        self.assertEqual(set(['a', 'b']),
 
614
            osutils.minimum_path_selection(['a', 'b']))
 
615
        self.assertEqual(set(['a/', 'b']),
 
616
            osutils.minimum_path_selection(['a/', 'b']))
 
617
        self.assertEqual(set(['a/', 'b']),
 
618
            osutils.minimum_path_selection(['a/c', 'a/', 'b']))
 
619
 
 
620
    def test_mkdtemp(self):
 
621
        tmpdir = osutils._win32_mkdtemp(dir='.')
 
622
        self.assertFalse('\\' in tmpdir)
 
623
 
 
624
    def test_rename(self):
 
625
        a = open('a', 'wb')
 
626
        a.write('foo\n')
 
627
        a.close()
 
628
        b = open('b', 'wb')
 
629
        b.write('baz\n')
 
630
        b.close()
 
631
 
 
632
        osutils._win32_rename('b', 'a')
 
633
        self.failUnlessExists('a')
 
634
        self.failIfExists('b')
 
635
        self.assertFileEqual('baz\n', 'a')
 
636
 
 
637
    def test_rename_missing_file(self):
 
638
        a = open('a', 'wb')
 
639
        a.write('foo\n')
 
640
        a.close()
 
641
 
 
642
        try:
 
643
            osutils._win32_rename('b', 'a')
 
644
        except (IOError, OSError), e:
 
645
            self.assertEqual(errno.ENOENT, e.errno)
 
646
        self.assertFileEqual('foo\n', 'a')
 
647
 
 
648
    def test_rename_missing_dir(self):
 
649
        os.mkdir('a')
 
650
        try:
 
651
            osutils._win32_rename('b', 'a')
 
652
        except (IOError, OSError), e:
 
653
            self.assertEqual(errno.ENOENT, e.errno)
 
654
 
 
655
    def test_rename_current_dir(self):
 
656
        os.mkdir('a')
 
657
        os.chdir('a')
 
658
        # You can't rename the working directory
 
659
        # doing rename non-existant . usually
 
660
        # just raises ENOENT, since non-existant
 
661
        # doesn't exist.
 
662
        try:
 
663
            osutils._win32_rename('b', '.')
 
664
        except (IOError, OSError), e:
 
665
            self.assertEqual(errno.ENOENT, e.errno)
 
666
 
 
667
    def test_splitpath(self):
 
668
        def check(expected, path):
 
669
            self.assertEqual(expected, osutils.splitpath(path))
 
670
 
 
671
        check(['a'], 'a')
 
672
        check(['a', 'b'], 'a/b')
 
673
        check(['a', 'b'], 'a/./b')
 
674
        check(['a', '.b'], 'a/.b')
 
675
        check(['a', '.b'], 'a\\.b')
 
676
 
 
677
        self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
 
678
 
 
679
 
 
680
class TestMacFuncsDirs(TestCaseInTempDir):
 
681
    """Test mac special functions that require directories."""
 
682
 
 
683
    def test_getcwd(self):
 
684
        # On Mac, this will actually create Ba\u030agfors
 
685
        # but chdir will still work, because it accepts both paths
 
686
        try:
 
687
            os.mkdir(u'B\xe5gfors')
 
688
        except UnicodeError:
 
689
            raise TestSkipped("Unable to create Unicode filename")
 
690
 
 
691
        os.chdir(u'B\xe5gfors')
 
692
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
693
 
 
694
    def test_getcwd_nonnorm(self):
 
695
        # Test that _mac_getcwd() will normalize this path
 
696
        try:
 
697
            os.mkdir(u'Ba\u030agfors')
 
698
        except UnicodeError:
 
699
            raise TestSkipped("Unable to create Unicode filename")
 
700
 
 
701
        os.chdir(u'Ba\u030agfors')
 
702
        self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
 
703
 
 
704
 
 
705
class TestSplitLines(TestCase):
 
706
 
 
707
    def test_split_unicode(self):
 
708
        self.assertEqual([u'foo\n', u'bar\xae'],
 
709
                         osutils.split_lines(u'foo\nbar\xae'))
 
710
        self.assertEqual([u'foo\n', u'bar\xae\n'],
 
711
                         osutils.split_lines(u'foo\nbar\xae\n'))
 
712
 
 
713
    def test_split_with_carriage_returns(self):
 
714
        self.assertEqual(['foo\rbar\n'],
 
715
                         osutils.split_lines('foo\rbar\n'))
 
716
 
 
717
 
 
718
class TestWalkDirs(TestCaseInTempDir):
 
719
 
 
720
    def test_walkdirs(self):
 
721
        tree = [
 
722
            '.bzr',
 
723
            '0file',
 
724
            '1dir/',
 
725
            '1dir/0file',
 
726
            '1dir/1dir/',
 
727
            '2file'
 
728
            ]
 
729
        self.build_tree(tree)
 
730
        expected_dirblocks = [
 
731
                (('', '.'),
 
732
                 [('0file', '0file', 'file'),
 
733
                  ('1dir', '1dir', 'directory'),
 
734
                  ('2file', '2file', 'file'),
 
735
                 ]
 
736
                ),
 
737
                (('1dir', './1dir'),
 
738
                 [('1dir/0file', '0file', 'file'),
 
739
                  ('1dir/1dir', '1dir', 'directory'),
 
740
                 ]
 
741
                ),
 
742
                (('1dir/1dir', './1dir/1dir'),
 
743
                 [
 
744
                 ]
 
745
                ),
 
746
            ]
 
747
        result = []
 
748
        found_bzrdir = False
 
749
        for dirdetail, dirblock in osutils.walkdirs('.'):
 
750
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
751
                # this tests the filtering of selected paths
 
752
                found_bzrdir = True
 
753
                del dirblock[0]
 
754
            result.append((dirdetail, dirblock))
 
755
 
 
756
        self.assertTrue(found_bzrdir)
 
757
        self.assertEqual(expected_dirblocks,
 
758
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
759
        # you can search a subdir only, with a supplied prefix.
 
760
        result = []
 
761
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
762
            result.append(dirblock)
 
763
        self.assertEqual(expected_dirblocks[1:],
 
764
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
765
 
 
766
    def test__walkdirs_utf8(self):
 
767
        tree = [
 
768
            '.bzr',
 
769
            '0file',
 
770
            '1dir/',
 
771
            '1dir/0file',
 
772
            '1dir/1dir/',
 
773
            '2file'
 
774
            ]
 
775
        self.build_tree(tree)
 
776
        expected_dirblocks = [
 
777
                (('', '.'),
 
778
                 [('0file', '0file', 'file'),
 
779
                  ('1dir', '1dir', 'directory'),
 
780
                  ('2file', '2file', 'file'),
 
781
                 ]
 
782
                ),
 
783
                (('1dir', './1dir'),
 
784
                 [('1dir/0file', '0file', 'file'),
 
785
                  ('1dir/1dir', '1dir', 'directory'),
 
786
                 ]
 
787
                ),
 
788
                (('1dir/1dir', './1dir/1dir'),
 
789
                 [
 
790
                 ]
 
791
                ),
 
792
            ]
 
793
        result = []
 
794
        found_bzrdir = False
 
795
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
796
            if len(dirblock) and dirblock[0][1] == '.bzr':
 
797
                # this tests the filtering of selected paths
 
798
                found_bzrdir = True
 
799
                del dirblock[0]
 
800
            result.append((dirdetail, dirblock))
 
801
 
 
802
        self.assertTrue(found_bzrdir)
 
803
        self.assertEqual(expected_dirblocks,
 
804
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
805
        # you can search a subdir only, with a supplied prefix.
 
806
        result = []
 
807
        for dirblock in osutils.walkdirs('./1dir', '1dir'):
 
808
            result.append(dirblock)
 
809
        self.assertEqual(expected_dirblocks[1:],
 
810
            [(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
 
811
 
 
812
    def _filter_out_stat(self, result):
 
813
        """Filter out the stat value from the walkdirs result"""
 
814
        for dirdetail, dirblock in result:
 
815
            new_dirblock = []
 
816
            for info in dirblock:
 
817
                # Ignore info[3] which is the stat
 
818
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
819
            dirblock[:] = new_dirblock
 
820
 
 
821
    def test_unicode_walkdirs(self):
 
822
        """Walkdirs should always return unicode paths."""
 
823
        name0 = u'0file-\xb6'
 
824
        name1 = u'1dir-\u062c\u0648'
 
825
        name2 = u'2file-\u0633'
 
826
        tree = [
 
827
            name0,
 
828
            name1 + '/',
 
829
            name1 + '/' + name0,
 
830
            name1 + '/' + name1 + '/',
 
831
            name2,
 
832
            ]
 
833
        try:
 
834
            self.build_tree(tree)
 
835
        except UnicodeError:
 
836
            raise TestSkipped('Could not represent Unicode chars'
 
837
                              ' in current encoding.')
 
838
        expected_dirblocks = [
 
839
                ((u'', u'.'),
 
840
                 [(name0, name0, 'file', './' + name0),
 
841
                  (name1, name1, 'directory', './' + name1),
 
842
                  (name2, name2, 'file', './' + name2),
 
843
                 ]
 
844
                ),
 
845
                ((name1, './' + name1),
 
846
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
847
                                                        + '/' + name0),
 
848
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
849
                                                            + '/' + name1),
 
850
                 ]
 
851
                ),
 
852
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
853
                 [
 
854
                 ]
 
855
                ),
 
856
            ]
 
857
        result = list(osutils.walkdirs('.'))
 
858
        self._filter_out_stat(result)
 
859
        self.assertEqual(expected_dirblocks, result)
 
860
        result = list(osutils.walkdirs(u'./'+name1, name1))
 
861
        self._filter_out_stat(result)
 
862
        self.assertEqual(expected_dirblocks[1:], result)
 
863
 
 
864
    def test_unicode__walkdirs_utf8(self):
 
865
        """Walkdirs_utf8 should always return utf8 paths.
 
866
 
 
867
        The abspath portion might be in unicode or utf-8
 
868
        """
 
869
        name0 = u'0file-\xb6'
 
870
        name1 = u'1dir-\u062c\u0648'
 
871
        name2 = u'2file-\u0633'
 
872
        tree = [
 
873
            name0,
 
874
            name1 + '/',
 
875
            name1 + '/' + name0,
 
876
            name1 + '/' + name1 + '/',
 
877
            name2,
 
878
            ]
 
879
        try:
 
880
            self.build_tree(tree)
 
881
        except UnicodeError:
 
882
            raise TestSkipped('Could not represent Unicode chars'
 
883
                              ' in current encoding.')
 
884
        name0 = name0.encode('utf8')
 
885
        name1 = name1.encode('utf8')
 
886
        name2 = name2.encode('utf8')
 
887
 
 
888
        expected_dirblocks = [
 
889
                (('', '.'),
 
890
                 [(name0, name0, 'file', './' + name0),
 
891
                  (name1, name1, 'directory', './' + name1),
 
892
                  (name2, name2, 'file', './' + name2),
 
893
                 ]
 
894
                ),
 
895
                ((name1, './' + name1),
 
896
                 [(name1 + '/' + name0, name0, 'file', './' + name1
 
897
                                                        + '/' + name0),
 
898
                  (name1 + '/' + name1, name1, 'directory', './' + name1
 
899
                                                            + '/' + name1),
 
900
                 ]
 
901
                ),
 
902
                ((name1 + '/' + name1, './' + name1 + '/' + name1),
 
903
                 [
 
904
                 ]
 
905
                ),
 
906
            ]
 
907
        result = []
 
908
        # For ease in testing, if walkdirs_utf8 returns Unicode, assert that
 
909
        # all abspaths are Unicode, and encode them back into utf8.
 
910
        for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
 
911
            self.assertIsInstance(dirdetail[0], str)
 
912
            if isinstance(dirdetail[1], unicode):
 
913
                dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
 
914
                dirblock = [list(info) for info in dirblock]
 
915
                for info in dirblock:
 
916
                    self.assertIsInstance(info[4], unicode)
 
917
                    info[4] = info[4].encode('utf8')
 
918
            new_dirblock = []
 
919
            for info in dirblock:
 
920
                self.assertIsInstance(info[0], str)
 
921
                self.assertIsInstance(info[1], str)
 
922
                self.assertIsInstance(info[4], str)
 
923
                # Remove the stat information
 
924
                new_dirblock.append((info[0], info[1], info[2], info[4]))
 
925
            result.append((dirdetail, new_dirblock))
 
926
        self.assertEqual(expected_dirblocks, result)
 
927
 
 
928
    def test_unicode__walkdirs_unicode_to_utf8(self):
 
929
        """walkdirs_unicode_to_utf8 should be a safe fallback everywhere
 
930
 
 
931
        The abspath portion should be in unicode
 
932
        """
 
933
        name0u = u'0file-\xb6'
 
934
        name1u = u'1dir-\u062c\u0648'
 
935
        name2u = u'2file-\u0633'
 
936
        tree = [
 
937
            name0u,
 
938
            name1u + '/',
 
939
            name1u + '/' + name0u,
 
940
            name1u + '/' + name1u + '/',
 
941
            name2u,
 
942
            ]
 
943
        try:
 
944
            self.build_tree(tree)
 
945
        except UnicodeError:
 
946
            raise TestSkipped('Could not represent Unicode chars'
 
947
                              ' in current encoding.')
 
948
        name0 = name0u.encode('utf8')
 
949
        name1 = name1u.encode('utf8')
 
950
        name2 = name2u.encode('utf8')
 
951
 
 
952
        # All of the abspaths should be in unicode, all of the relative paths
 
953
        # should be in utf8
 
954
        expected_dirblocks = [
 
955
                (('', '.'),
 
956
                 [(name0, name0, 'file', './' + name0u),
 
957
                  (name1, name1, 'directory', './' + name1u),
 
958
                  (name2, name2, 'file', './' + name2u),
 
959
                 ]
 
960
                ),
 
961
                ((name1, './' + name1u),
 
962
                 [(name1 + '/' + name0, name0, 'file', './' + name1u
 
963
                                                        + '/' + name0u),
 
964
                  (name1 + '/' + name1, name1, 'directory', './' + name1u
 
965
                                                            + '/' + name1u),
 
966
                 ]
 
967
                ),
 
968
                ((name1 + '/' + name1, './' + name1u + '/' + name1u),
 
969
                 [
 
970
                 ]
 
971
                ),
 
972
            ]
 
973
        result = list(osutils._walkdirs_unicode_to_utf8('.'))
 
974
        self._filter_out_stat(result)
 
975
        self.assertEqual(expected_dirblocks, result)
 
976
 
 
977
    def assertPathCompare(self, path_less, path_greater):
 
978
        """check that path_less and path_greater compare correctly."""
 
979
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
980
            path_less, path_less))
 
981
        self.assertEqual(0, osutils.compare_paths_prefix_order(
 
982
            path_greater, path_greater))
 
983
        self.assertEqual(-1, osutils.compare_paths_prefix_order(
 
984
            path_less, path_greater))
 
985
        self.assertEqual(1, osutils.compare_paths_prefix_order(
 
986
            path_greater, path_less))
 
987
 
 
988
    def test_compare_paths_prefix_order(self):
 
989
        # root before all else
 
990
        self.assertPathCompare("/", "/a")
 
991
        # alpha within a dir
 
992
        self.assertPathCompare("/a", "/b")
 
993
        self.assertPathCompare("/b", "/z")
 
994
        # high dirs before lower.
 
995
        self.assertPathCompare("/z", "/a/a")
 
996
        # except if the deeper dir should be output first
 
997
        self.assertPathCompare("/a/b/c", "/d/g")
 
998
        # lexical betwen dirs of the same height
 
999
        self.assertPathCompare("/a/z", "/z/z")
 
1000
        self.assertPathCompare("/a/c/z", "/a/d/e")
 
1001
 
 
1002
        # this should also be consistent for no leading / paths
 
1003
        # root before all else
 
1004
        self.assertPathCompare("", "a")
 
1005
        # alpha within a dir
 
1006
        self.assertPathCompare("a", "b")
 
1007
        self.assertPathCompare("b", "z")
 
1008
        # high dirs before lower.
 
1009
        self.assertPathCompare("z", "a/a")
 
1010
        # except if the deeper dir should be output first
 
1011
        self.assertPathCompare("a/b/c", "d/g")
 
1012
        # lexical betwen dirs of the same height
 
1013
        self.assertPathCompare("a/z", "z/z")
 
1014
        self.assertPathCompare("a/c/z", "a/d/e")
 
1015
 
 
1016
    def test_path_prefix_sorting(self):
 
1017
        """Doing a sort on path prefix should match our sample data."""
 
1018
        original_paths = [
 
1019
            'a',
 
1020
            'a/b',
 
1021
            'a/b/c',
 
1022
            'b',
 
1023
            'b/c',
 
1024
            'd',
 
1025
            'd/e',
 
1026
            'd/e/f',
 
1027
            'd/f',
 
1028
            'd/g',
 
1029
            'g',
 
1030
            ]
 
1031
 
 
1032
        dir_sorted_paths = [
 
1033
            'a',
 
1034
            'b',
 
1035
            'd',
 
1036
            'g',
 
1037
            'a/b',
 
1038
            'a/b/c',
 
1039
            'b/c',
 
1040
            'd/e',
 
1041
            'd/f',
 
1042
            'd/g',
 
1043
            'd/e/f',
 
1044
            ]
 
1045
 
 
1046
        self.assertEqual(
 
1047
            dir_sorted_paths,
 
1048
            sorted(original_paths, key=osutils.path_prefix_key))
 
1049
        # using the comparison routine shoudl work too:
 
1050
        self.assertEqual(
 
1051
            dir_sorted_paths,
 
1052
            sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
 
1053
 
 
1054
 
 
1055
class TestCopyTree(TestCaseInTempDir):
 
1056
    
 
1057
    def test_copy_basic_tree(self):
 
1058
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1059
        osutils.copy_tree('source', 'target')
 
1060
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1061
        self.assertEqual(['c'], os.listdir('target/b'))
 
1062
 
 
1063
    def test_copy_tree_target_exists(self):
 
1064
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
 
1065
                         'target/'])
 
1066
        osutils.copy_tree('source', 'target')
 
1067
        self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
 
1068
        self.assertEqual(['c'], os.listdir('target/b'))
 
1069
 
 
1070
    def test_copy_tree_symlinks(self):
 
1071
        self.requireFeature(SymlinkFeature)
 
1072
        self.build_tree(['source/'])
 
1073
        os.symlink('a/generic/path', 'source/lnk')
 
1074
        osutils.copy_tree('source', 'target')
 
1075
        self.assertEqual(['lnk'], os.listdir('target'))
 
1076
        self.assertEqual('a/generic/path', os.readlink('target/lnk'))
 
1077
 
 
1078
    def test_copy_tree_handlers(self):
 
1079
        processed_files = []
 
1080
        processed_links = []
 
1081
        def file_handler(from_path, to_path):
 
1082
            processed_files.append(('f', from_path, to_path))
 
1083
        def dir_handler(from_path, to_path):
 
1084
            processed_files.append(('d', from_path, to_path))
 
1085
        def link_handler(from_path, to_path):
 
1086
            processed_links.append((from_path, to_path))
 
1087
        handlers = {'file':file_handler,
 
1088
                    'directory':dir_handler,
 
1089
                    'symlink':link_handler,
 
1090
                   }
 
1091
 
 
1092
        self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
 
1093
        if osutils.has_symlinks():
 
1094
            os.symlink('a/generic/path', 'source/lnk')
 
1095
        osutils.copy_tree('source', 'target', handlers=handlers)
 
1096
 
 
1097
        self.assertEqual([('d', 'source', 'target'),
 
1098
                          ('f', 'source/a', 'target/a'),
 
1099
                          ('d', 'source/b', 'target/b'),
 
1100
                          ('f', 'source/b/c', 'target/b/c'),
 
1101
                         ], processed_files)
 
1102
        self.failIfExists('target')
 
1103
        if osutils.has_symlinks():
 
1104
            self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
 
1105
 
 
1106
 
 
1107
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
 
1108
# [bialix] 2006/12/26
 
1109
 
 
1110
 
 
1111
class TestSetUnsetEnv(TestCase):
 
1112
    """Test updating the environment"""
 
1113
 
 
1114
    def setUp(self):
 
1115
        super(TestSetUnsetEnv, self).setUp()
 
1116
 
 
1117
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
 
1118
                         'Environment was not cleaned up properly.'
 
1119
                         ' Variable BZR_TEST_ENV_VAR should not exist.')
 
1120
        def cleanup():
 
1121
            if 'BZR_TEST_ENV_VAR' in os.environ:
 
1122
                del os.environ['BZR_TEST_ENV_VAR']
 
1123
 
 
1124
        self.addCleanup(cleanup)
 
1125
 
 
1126
    def test_set(self):
 
1127
        """Test that we can set an env variable"""
 
1128
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1129
        self.assertEqual(None, old)
 
1130
        self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
 
1131
 
 
1132
    def test_double_set(self):
 
1133
        """Test that we get the old value out"""
 
1134
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1135
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
 
1136
        self.assertEqual('foo', old)
 
1137
        self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
 
1138
 
 
1139
    def test_unicode(self):
 
1140
        """Environment can only contain plain strings
 
1141
        
 
1142
        So Unicode strings must be encoded.
 
1143
        """
 
1144
        uni_val, env_val = probe_unicode_in_user_encoding()
 
1145
        if uni_val is None:
 
1146
            raise TestSkipped('Cannot find a unicode character that works in'
 
1147
                              ' encoding %s' % (bzrlib.user_encoding,))
 
1148
 
 
1149
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
 
1150
        self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
 
1151
 
 
1152
    def test_unset(self):
 
1153
        """Test that passing None will remove the env var"""
 
1154
        osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
 
1155
        old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
 
1156
        self.assertEqual('foo', old)
 
1157
        self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
 
1158
        self.failIf('BZR_TEST_ENV_VAR' in os.environ)
 
1159
 
 
1160
 
 
1161
class TestLocalTimeOffset(TestCase):
 
1162
 
 
1163
    def test_local_time_offset(self):
 
1164
        """Test that local_time_offset() returns a sane value."""
 
1165
        offset = osutils.local_time_offset()
 
1166
        self.assertTrue(isinstance(offset, int))
 
1167
        # Test that the offset is no more than a eighteen hours in
 
1168
        # either direction.
 
1169
        # Time zone handling is system specific, so it is difficult to
 
1170
        # do more specific tests, but a value outside of this range is
 
1171
        # probably wrong.
 
1172
        eighteen_hours = 18 * 3600
 
1173
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1174
 
 
1175
    def test_local_time_offset_with_timestamp(self):
 
1176
        """Test that local_time_offset() works with a timestamp."""
 
1177
        offset = osutils.local_time_offset(1000000000.1234567)
 
1178
        self.assertTrue(isinstance(offset, int))
 
1179
        eighteen_hours = 18 * 3600
 
1180
        self.assertTrue(-eighteen_hours < offset < eighteen_hours)
 
1181
 
 
1182
 
 
1183
class TestShaFileByName(TestCaseInTempDir):
 
1184
 
 
1185
    def test_sha_empty(self):
 
1186
        self.build_tree_contents([('foo', '')])
 
1187
        expected_sha = osutils.sha_string('')
 
1188
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1189
 
 
1190
    def test_sha_mixed_endings(self):
 
1191
        text = 'test\r\nwith\nall\rpossible line endings\r\n'
 
1192
        self.build_tree_contents([('foo', text)])
 
1193
        expected_sha = osutils.sha_string(text)
 
1194
        self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
 
1195
 
 
1196
 
 
1197
_debug_text = \
 
1198
r'''# Copyright (C) 2005, 2006 Canonical Ltd
 
1199
#
 
1200
# This program is free software; you can redistribute it and/or modify
 
1201
# it under the terms of the GNU General Public License as published by
 
1202
# the Free Software Foundation; either version 2 of the License, or
 
1203
# (at your option) any later version.
 
1204
#
 
1205
# This program is distributed in the hope that it will be useful,
 
1206
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
1207
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
1208
# GNU General Public License for more details.
 
1209
#
 
1210
# You should have received a copy of the GNU General Public License
 
1211
# along with this program; if not, write to the Free Software
 
1212
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
1213
 
 
1214
 
 
1215
# NOTE: If update these, please also update the help for global-options in
 
1216
#       bzrlib/help_topics/__init__.py
 
1217
 
 
1218
debug_flags = set()
 
1219
"""Set of flags that enable different debug behaviour.
 
1220
 
 
1221
These are set with eg ``-Dlock`` on the bzr command line.
 
1222
 
 
1223
Options include:
 
1224
 
 
1225
 * auth - show authentication sections used
 
1226
 * error - show stack traces for all top level exceptions
 
1227
 * evil - capture call sites that do expensive or badly-scaling operations.
 
1228
 * fetch - trace history copying between repositories
 
1229
 * graph - trace graph traversal information
 
1230
 * hashcache - log every time a working file is read to determine its hash
 
1231
 * hooks - trace hook execution
 
1232
 * hpss - trace smart protocol requests and responses
 
1233
 * http - trace http connections, requests and responses
 
1234
 * index - trace major index operations
 
1235
 * knit - trace knit operations
 
1236
 * lock - trace when lockdir locks are taken or released
 
1237
 * merge - emit information for debugging merges
 
1238
 * pack - emit information about pack operations
 
1239
 
 
1240
"""
 
1241
'''
 
1242
 
 
1243
 
 
1244
class TestResourceLoading(TestCaseInTempDir):
 
1245
 
 
1246
    def test_resource_string(self):
 
1247
        # test resource in bzrlib
 
1248
        text = osutils.resource_string('bzrlib', 'debug.py')
 
1249
        self.assertEquals(_debug_text, text)
 
1250
        # test resource under bzrlib
 
1251
        text = osutils.resource_string('bzrlib.ui', 'text.py')
 
1252
        self.assertContainsRe(text, "class TextUIFactory")
 
1253
        # test unsupported package
 
1254
        self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
 
1255
            'yyy.xx')
 
1256
        # test unknown resource
 
1257
        self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')