71
117
self.assertEqual(type(result), str)
72
118
self.assertContainsRe(result, r'^[a-z0-9]{100}$')
120
def test_is_inside(self):
121
is_inside = osutils.is_inside
122
self.assertTrue(is_inside('src', 'src/foo.c'))
123
self.assertFalse(is_inside('src', 'srccontrol'))
124
self.assertTrue(is_inside('src', 'src/a/a/a/foo.c'))
125
self.assertTrue(is_inside('foo.c', 'foo.c'))
126
self.assertFalse(is_inside('foo.c', ''))
127
self.assertTrue(is_inside('', 'foo.c'))
129
def test_is_inside_any(self):
130
SRC_FOO_C = pathjoin('src', 'foo.c')
131
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
132
(['src'], SRC_FOO_C),
135
self.assert_(is_inside_any(dirs, fn))
136
for dirs, fn in [(['src'], 'srccontrol'),
137
(['src'], 'srccontrol/foo')]:
138
self.assertFalse(is_inside_any(dirs, fn))
140
def test_is_inside_or_parent_of_any(self):
141
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
142
(['src'], 'src/foo.c'),
143
(['src/bar.c'], 'src'),
144
(['src/bar.c', 'bla/foo.c'], 'src'),
147
self.assert_(is_inside_or_parent_of_any(dirs, fn))
149
for dirs, fn in [(['src'], 'srccontrol'),
150
(['srccontrol/foo.c'], 'src'),
151
(['src'], 'srccontrol/foo')]:
152
self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
154
def test_rmtree(self):
155
# Check to remove tree with read-only files/dirs
157
f = file('dir/file', 'w')
160
# would like to also try making the directory readonly, but at the
161
# moment python shutil.rmtree doesn't handle that properly - it would
162
# need to chmod the directory before removing things inside it - deferred
163
# for now -- mbp 20060505
164
# osutils.make_readonly('dir')
165
osutils.make_readonly('dir/file')
167
osutils.rmtree('dir')
169
self.failIfExists('dir/file')
170
self.failIfExists('dir')
172
def test_file_kind(self):
173
self.build_tree(['file', 'dir/'])
174
self.assertEquals('file', osutils.file_kind('file'))
175
self.assertEquals('directory', osutils.file_kind('dir/'))
176
if osutils.has_symlinks():
177
os.symlink('symlink', 'symlink')
178
self.assertEquals('symlink', osutils.file_kind('symlink'))
180
# TODO: jam 20060529 Test a block device
182
os.lstat('/dev/null')
184
if e.errno not in (errno.ENOENT,):
187
self.assertEquals('chardev', osutils.file_kind('/dev/null'))
189
mkfifo = getattr(os, 'mkfifo', None)
193
self.assertEquals('fifo', osutils.file_kind('fifo'))
197
AF_UNIX = getattr(socket, 'AF_UNIX', None)
199
s = socket.socket(AF_UNIX)
202
self.assertEquals('socket', osutils.file_kind('socket'))
206
def test_kind_marker(self):
207
self.assertEqual(osutils.kind_marker('file'), '')
208
self.assertEqual(osutils.kind_marker('directory'), '/')
209
self.assertEqual(osutils.kind_marker('symlink'), '@')
210
self.assertEqual(osutils.kind_marker('tree-reference'), '+')
212
def test_get_umask(self):
213
if sys.platform == 'win32':
214
# umask always returns '0', no way to set it
215
self.assertEqual(0, osutils.get_umask())
218
orig_umask = osutils.get_umask()
221
self.assertEqual(0222, osutils.get_umask())
223
self.assertEqual(0022, osutils.get_umask())
225
self.assertEqual(0002, osutils.get_umask())
227
self.assertEqual(0027, osutils.get_umask())
231
def assertFormatedDelta(self, expected, seconds):
232
"""Assert osutils.format_delta formats as expected"""
233
actual = osutils.format_delta(seconds)
234
self.assertEqual(expected, actual)
236
def test_format_delta(self):
237
self.assertFormatedDelta('0 seconds ago', 0)
238
self.assertFormatedDelta('1 second ago', 1)
239
self.assertFormatedDelta('10 seconds ago', 10)
240
self.assertFormatedDelta('59 seconds ago', 59)
241
self.assertFormatedDelta('89 seconds ago', 89)
242
self.assertFormatedDelta('1 minute, 30 seconds ago', 90)
243
self.assertFormatedDelta('3 minutes, 0 seconds ago', 180)
244
self.assertFormatedDelta('3 minutes, 1 second ago', 181)
245
self.assertFormatedDelta('10 minutes, 15 seconds ago', 615)
246
self.assertFormatedDelta('30 minutes, 59 seconds ago', 1859)
247
self.assertFormatedDelta('31 minutes, 0 seconds ago', 1860)
248
self.assertFormatedDelta('60 minutes, 0 seconds ago', 3600)
249
self.assertFormatedDelta('89 minutes, 59 seconds ago', 5399)
250
self.assertFormatedDelta('1 hour, 30 minutes ago', 5400)
251
self.assertFormatedDelta('2 hours, 30 minutes ago', 9017)
252
self.assertFormatedDelta('10 hours, 0 minutes ago', 36000)
253
self.assertFormatedDelta('24 hours, 0 minutes ago', 86400)
254
self.assertFormatedDelta('35 hours, 59 minutes ago', 129599)
255
self.assertFormatedDelta('36 hours, 0 minutes ago', 129600)
256
self.assertFormatedDelta('36 hours, 0 minutes ago', 129601)
257
self.assertFormatedDelta('36 hours, 1 minute ago', 129660)
258
self.assertFormatedDelta('36 hours, 1 minute ago', 129661)
259
self.assertFormatedDelta('84 hours, 10 minutes ago', 303002)
261
# We handle when time steps the wrong direction because computers
262
# don't have synchronized clocks.
263
self.assertFormatedDelta('84 hours, 10 minutes in the future', -303002)
264
self.assertFormatedDelta('1 second in the future', -1)
265
self.assertFormatedDelta('2 seconds in the future', -2)
267
def test_format_date(self):
268
self.assertRaises(errors.UnsupportedTimezoneFormat,
269
osutils.format_date, 0, timezone='foo')
271
def test_dereference_path(self):
272
self.requireFeature(SymlinkFeature)
273
cwd = osutils.realpath('.')
275
bar_path = osutils.pathjoin(cwd, 'bar')
276
# Using './' to avoid bug #1213894 (first path component not
277
# dereferenced) in Python 2.4.1 and earlier
278
self.assertEqual(bar_path, osutils.realpath('./bar'))
279
os.symlink('bar', 'foo')
280
self.assertEqual(bar_path, osutils.realpath('./foo'))
282
# Does not dereference terminal symlinks
283
foo_path = osutils.pathjoin(cwd, 'foo')
284
self.assertEqual(foo_path, osutils.dereference_path('./foo'))
286
# Dereferences parent symlinks
288
baz_path = osutils.pathjoin(bar_path, 'baz')
289
self.assertEqual(baz_path, osutils.dereference_path('./foo/baz'))
291
# Dereferences parent symlinks that are the first path element
292
self.assertEqual(baz_path, osutils.dereference_path('foo/baz'))
294
# Dereferences parent symlinks in absolute paths
295
foo_baz_path = osutils.pathjoin(foo_path, 'baz')
296
self.assertEqual(baz_path, osutils.dereference_path(foo_baz_path))
298
def test_changing_access(self):
299
f = file('file', 'w')
303
# Make a file readonly
304
osutils.make_readonly('file')
305
mode = os.lstat('file').st_mode
306
self.assertEqual(mode, mode & 0777555)
308
# Make a file writable
309
osutils.make_writable('file')
310
mode = os.lstat('file').st_mode
311
self.assertEqual(mode, mode | 0200)
313
if osutils.has_symlinks():
314
# should not error when handed a symlink
315
os.symlink('nonexistent', 'dangling')
316
osutils.make_readonly('dangling')
317
osutils.make_writable('dangling')
319
def test_kind_marker(self):
320
self.assertEqual("", osutils.kind_marker("file"))
321
self.assertEqual("/", osutils.kind_marker(osutils._directory_kind))
322
self.assertEqual("@", osutils.kind_marker("symlink"))
323
self.assertRaises(errors.BzrError, osutils.kind_marker, "unknown")
326
class TestPumpFile(TestCase):
327
"""Test pumpfile method."""
329
# create a test datablock
330
self.block_size = 512
331
pattern = '0123456789ABCDEF'
332
self.test_data = pattern * (3 * self.block_size / len(pattern))
333
self.test_data_len = len(self.test_data)
335
def test_bracket_block_size(self):
336
"""Read data in blocks with the requested read size bracketing the
338
# make sure test data is larger than max read size
339
self.assertTrue(self.test_data_len > self.block_size)
341
from_file = FakeReadFile(self.test_data)
344
# read (max / 2) bytes and verify read size wasn't affected
345
num_bytes_to_read = self.block_size / 2
346
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
347
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
348
self.assertEqual(from_file.get_read_count(), 1)
350
# read (max) bytes and verify read size wasn't affected
351
num_bytes_to_read = self.block_size
352
from_file.reset_read_count()
353
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
354
self.assertEqual(from_file.get_max_read_size(), num_bytes_to_read)
355
self.assertEqual(from_file.get_read_count(), 1)
357
# read (max + 1) bytes and verify read size was limited
358
num_bytes_to_read = self.block_size + 1
359
from_file.reset_read_count()
360
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
361
self.assertEqual(from_file.get_max_read_size(), self.block_size)
362
self.assertEqual(from_file.get_read_count(), 2)
364
# finish reading the rest of the data
365
num_bytes_to_read = self.test_data_len - to_file.tell()
366
pumpfile(from_file, to_file, num_bytes_to_read, self.block_size)
368
# report error if the data wasn't equal (we only report the size due
369
# to the length of the data)
370
response_data = to_file.getvalue()
371
if response_data != self.test_data:
372
message = "Data not equal. Expected %d bytes, received %d."
373
self.fail(message % (len(response_data), self.test_data_len))
375
def test_specified_size(self):
376
"""Request a transfer larger than the maximum block size and verify
377
that the maximum read doesn't exceed the block_size."""
378
# make sure test data is larger than max read size
379
self.assertTrue(self.test_data_len > self.block_size)
381
# retrieve data in blocks
382
from_file = FakeReadFile(self.test_data)
384
pumpfile(from_file, to_file, self.test_data_len, self.block_size)
386
# verify read size was equal to the maximum read size
387
self.assertTrue(from_file.get_max_read_size() > 0)
388
self.assertEqual(from_file.get_max_read_size(), self.block_size)
389
self.assertEqual(from_file.get_read_count(), 3)
391
# report error if the data wasn't equal (we only report the size due
392
# to the length of the data)
393
response_data = to_file.getvalue()
394
if response_data != self.test_data:
395
message = "Data not equal. Expected %d bytes, received %d."
396
self.fail(message % (len(response_data), self.test_data_len))
398
def test_to_eof(self):
399
"""Read to end-of-file and verify that the reads are not larger than
400
the maximum read size."""
401
# make sure test data is larger than max read size
402
self.assertTrue(self.test_data_len > self.block_size)
404
# retrieve data to EOF
405
from_file = FakeReadFile(self.test_data)
407
pumpfile(from_file, to_file, -1, self.block_size)
409
# verify read size was equal to the maximum read size
410
self.assertEqual(from_file.get_max_read_size(), self.block_size)
411
self.assertEqual(from_file.get_read_count(), 4)
413
# report error if the data wasn't equal (we only report the size due
414
# to the length of the data)
415
response_data = to_file.getvalue()
416
if response_data != self.test_data:
417
message = "Data not equal. Expected %d bytes, received %d."
418
self.fail(message % (len(response_data), self.test_data_len))
420
def test_defaults(self):
421
"""Verifies that the default arguments will read to EOF -- this
422
test verifies that any existing usages of pumpfile will not be broken
423
with this new version."""
424
# retrieve data using default (old) pumpfile method
425
from_file = FakeReadFile(self.test_data)
427
pumpfile(from_file, to_file)
429
# report error if the data wasn't equal (we only report the size due
430
# to the length of the data)
431
response_data = to_file.getvalue()
432
if response_data != self.test_data:
433
message = "Data not equal. Expected %d bytes, received %d."
434
self.fail(message % (len(response_data), self.test_data_len))
75
436
class TestSafeUnicode(TestCase):
90
451
self.assertRaises(BzrBadParameterNotUnicode,
91
452
osutils.safe_unicode,
456
class TestSafeUtf8(TestCase):
458
def test_from_ascii_string(self):
460
self.assertEqual('foobar', osutils.safe_utf8(f))
462
def test_from_unicode_string_ascii_contents(self):
463
self.assertEqual('bargam', osutils.safe_utf8(u'bargam'))
465
def test_from_unicode_string_unicode_contents(self):
466
self.assertEqual('bargam\xc2\xae', osutils.safe_utf8(u'bargam\xae'))
468
def test_from_utf8_string(self):
469
self.assertEqual('foo\xc2\xae', osutils.safe_utf8('foo\xc2\xae'))
471
def test_bad_utf8_string(self):
472
self.assertRaises(BzrBadParameterNotUnicode,
473
osutils.safe_utf8, '\xbb\xbb')
476
class TestSafeRevisionId(TestCase):
478
def test_from_ascii_string(self):
479
# this shouldn't give a warning because it's getting an ascii string
480
self.assertEqual('foobar', osutils.safe_revision_id('foobar'))
482
def test_from_unicode_string_ascii_contents(self):
483
self.assertEqual('bargam',
484
osutils.safe_revision_id(u'bargam', warn=False))
486
def test_from_unicode_deprecated(self):
487
self.assertEqual('bargam',
488
self.callDeprecated([osutils._revision_id_warning],
489
osutils.safe_revision_id, u'bargam'))
491
def test_from_unicode_string_unicode_contents(self):
492
self.assertEqual('bargam\xc2\xae',
493
osutils.safe_revision_id(u'bargam\xae', warn=False))
495
def test_from_utf8_string(self):
496
self.assertEqual('foo\xc2\xae',
497
osutils.safe_revision_id('foo\xc2\xae'))
500
"""Currently, None is a valid revision_id"""
501
self.assertEqual(None, osutils.safe_revision_id(None))
504
class TestSafeFileId(TestCase):
506
def test_from_ascii_string(self):
507
self.assertEqual('foobar', osutils.safe_file_id('foobar'))
509
def test_from_unicode_string_ascii_contents(self):
510
self.assertEqual('bargam', osutils.safe_file_id(u'bargam', warn=False))
512
def test_from_unicode_deprecated(self):
513
self.assertEqual('bargam',
514
self.callDeprecated([osutils._file_id_warning],
515
osutils.safe_file_id, u'bargam'))
517
def test_from_unicode_string_unicode_contents(self):
518
self.assertEqual('bargam\xc2\xae',
519
osutils.safe_file_id(u'bargam\xae', warn=False))
521
def test_from_utf8_string(self):
522
self.assertEqual('foo\xc2\xae',
523
osutils.safe_file_id('foo\xc2\xae'))
526
"""Currently, None is a valid revision_id"""
527
self.assertEqual(None, osutils.safe_file_id(None))
530
class TestWin32Funcs(TestCase):
531
"""Test that the _win32 versions of os utilities return appropriate paths."""
533
def test_abspath(self):
534
self.assertEqual('C:/foo', osutils._win32_abspath('C:\\foo'))
535
self.assertEqual('C:/foo', osutils._win32_abspath('C:/foo'))
536
self.assertEqual('//HOST/path', osutils._win32_abspath(r'\\HOST\path'))
537
self.assertEqual('//HOST/path', osutils._win32_abspath('//HOST/path'))
539
def test_realpath(self):
540
self.assertEqual('C:/foo', osutils._win32_realpath('C:\\foo'))
541
self.assertEqual('C:/foo', osutils._win32_realpath('C:/foo'))
543
def test_pathjoin(self):
544
self.assertEqual('path/to/foo', osutils._win32_pathjoin('path', 'to', 'foo'))
545
self.assertEqual('C:/foo', osutils._win32_pathjoin('path\\to', 'C:\\foo'))
546
self.assertEqual('C:/foo', osutils._win32_pathjoin('path/to', 'C:/foo'))
547
self.assertEqual('path/to/foo', osutils._win32_pathjoin('path/to/', 'foo'))
548
self.assertEqual('/foo', osutils._win32_pathjoin('C:/path/to/', '/foo'))
549
self.assertEqual('/foo', osutils._win32_pathjoin('C:\\path\\to\\', '\\foo'))
551
def test_normpath(self):
552
self.assertEqual('path/to/foo', osutils._win32_normpath(r'path\\from\..\to\.\foo'))
553
self.assertEqual('path/to/foo', osutils._win32_normpath('path//from/../to/./foo'))
555
def test_getcwd(self):
556
cwd = osutils._win32_getcwd()
557
os_cwd = os.getcwdu()
558
self.assertEqual(os_cwd[1:].replace('\\', '/'), cwd[1:])
559
# win32 is inconsistent whether it returns lower or upper case
560
# and even if it was consistent the user might type the other
561
# so we force it to uppercase
562
# running python.exe under cmd.exe return capital C:\\
563
# running win32 python inside a cygwin shell returns lowercase
564
self.assertEqual(os_cwd[0].upper(), cwd[0])
566
def test_fixdrive(self):
567
self.assertEqual('H:/foo', osutils._win32_fixdrive('h:/foo'))
568
self.assertEqual('H:/foo', osutils._win32_fixdrive('H:/foo'))
569
self.assertEqual('C:\\foo', osutils._win32_fixdrive('c:\\foo'))
571
def test_win98_abspath(self):
573
self.assertEqual('C:/foo', osutils._win98_abspath('C:\\foo'))
574
self.assertEqual('C:/foo', osutils._win98_abspath('C:/foo'))
576
self.assertEqual('//HOST/path', osutils._win98_abspath(r'\\HOST\path'))
577
self.assertEqual('//HOST/path', osutils._win98_abspath('//HOST/path'))
579
cwd = osutils.getcwd().rstrip('/')
580
drive = osutils._nt_splitdrive(cwd)[0]
581
self.assertEqual(cwd+'/path', osutils._win98_abspath('path'))
582
self.assertEqual(drive+'/path', osutils._win98_abspath('/path'))
585
self.assertEqual(cwd+'/'+u, osutils._win98_abspath(u))
588
class TestWin32FuncsDirs(TestCaseInTempDir):
589
"""Test win32 functions that create files."""
591
def test_getcwd(self):
592
if win32utils.winver == 'Windows 98':
593
raise TestSkipped('Windows 98 cannot handle unicode filenames')
594
# Make sure getcwd can handle unicode filenames
598
raise TestSkipped("Unable to create Unicode filename")
601
# TODO: jam 20060427 This will probably fail on Mac OSX because
602
# it will change the normalization of B\xe5gfors
603
# Consider using a different unicode character, or make
604
# osutils.getcwd() renormalize the path.
605
self.assertEndsWith(osutils._win32_getcwd(), u'mu-\xb5')
607
def test_minimum_path_selection(self):
608
self.assertEqual(set(),
609
osutils.minimum_path_selection([]))
610
self.assertEqual(set(['a', 'b']),
611
osutils.minimum_path_selection(['a', 'b']))
612
self.assertEqual(set(['a/', 'b']),
613
osutils.minimum_path_selection(['a/', 'b']))
614
self.assertEqual(set(['a/', 'b']),
615
osutils.minimum_path_selection(['a/c', 'a/', 'b']))
617
def test_mkdtemp(self):
618
tmpdir = osutils._win32_mkdtemp(dir='.')
619
self.assertFalse('\\' in tmpdir)
621
def test_rename(self):
629
osutils._win32_rename('b', 'a')
630
self.failUnlessExists('a')
631
self.failIfExists('b')
632
self.assertFileEqual('baz\n', 'a')
634
def test_rename_missing_file(self):
640
osutils._win32_rename('b', 'a')
641
except (IOError, OSError), e:
642
self.assertEqual(errno.ENOENT, e.errno)
643
self.assertFileEqual('foo\n', 'a')
645
def test_rename_missing_dir(self):
648
osutils._win32_rename('b', 'a')
649
except (IOError, OSError), e:
650
self.assertEqual(errno.ENOENT, e.errno)
652
def test_rename_current_dir(self):
655
# You can't rename the working directory
656
# doing rename non-existant . usually
657
# just raises ENOENT, since non-existant
660
osutils._win32_rename('b', '.')
661
except (IOError, OSError), e:
662
self.assertEqual(errno.ENOENT, e.errno)
664
def test_splitpath(self):
665
def check(expected, path):
666
self.assertEqual(expected, osutils.splitpath(path))
669
check(['a', 'b'], 'a/b')
670
check(['a', 'b'], 'a/./b')
671
check(['a', '.b'], 'a/.b')
672
check(['a', '.b'], 'a\\.b')
674
self.assertRaises(errors.BzrError, osutils.splitpath, 'a/../b')
677
class TestMacFuncsDirs(TestCaseInTempDir):
678
"""Test mac special functions that require directories."""
680
def test_getcwd(self):
681
# On Mac, this will actually create Ba\u030agfors
682
# but chdir will still work, because it accepts both paths
684
os.mkdir(u'B\xe5gfors')
686
raise TestSkipped("Unable to create Unicode filename")
688
os.chdir(u'B\xe5gfors')
689
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
691
def test_getcwd_nonnorm(self):
692
# Test that _mac_getcwd() will normalize this path
694
os.mkdir(u'Ba\u030agfors')
696
raise TestSkipped("Unable to create Unicode filename")
698
os.chdir(u'Ba\u030agfors')
699
self.assertEndsWith(osutils._mac_getcwd(), u'B\xe5gfors')
702
class TestSplitLines(TestCase):
704
def test_split_unicode(self):
705
self.assertEqual([u'foo\n', u'bar\xae'],
706
osutils.split_lines(u'foo\nbar\xae'))
707
self.assertEqual([u'foo\n', u'bar\xae\n'],
708
osutils.split_lines(u'foo\nbar\xae\n'))
710
def test_split_with_carriage_returns(self):
711
self.assertEqual(['foo\rbar\n'],
712
osutils.split_lines('foo\rbar\n'))
715
class TestWalkDirs(TestCaseInTempDir):
717
def test_walkdirs(self):
726
self.build_tree(tree)
727
expected_dirblocks = [
729
[('0file', '0file', 'file'),
730
('1dir', '1dir', 'directory'),
731
('2file', '2file', 'file'),
735
[('1dir/0file', '0file', 'file'),
736
('1dir/1dir', '1dir', 'directory'),
739
(('1dir/1dir', './1dir/1dir'),
746
for dirdetail, dirblock in osutils.walkdirs('.'):
747
if len(dirblock) and dirblock[0][1] == '.bzr':
748
# this tests the filtering of selected paths
751
result.append((dirdetail, dirblock))
753
self.assertTrue(found_bzrdir)
754
self.assertEqual(expected_dirblocks,
755
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
756
# you can search a subdir only, with a supplied prefix.
758
for dirblock in osutils.walkdirs('./1dir', '1dir'):
759
result.append(dirblock)
760
self.assertEqual(expected_dirblocks[1:],
761
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
763
def test__walkdirs_utf8(self):
772
self.build_tree(tree)
773
expected_dirblocks = [
775
[('0file', '0file', 'file'),
776
('1dir', '1dir', 'directory'),
777
('2file', '2file', 'file'),
781
[('1dir/0file', '0file', 'file'),
782
('1dir/1dir', '1dir', 'directory'),
785
(('1dir/1dir', './1dir/1dir'),
792
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
793
if len(dirblock) and dirblock[0][1] == '.bzr':
794
# this tests the filtering of selected paths
797
result.append((dirdetail, dirblock))
799
self.assertTrue(found_bzrdir)
800
self.assertEqual(expected_dirblocks,
801
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
802
# you can search a subdir only, with a supplied prefix.
804
for dirblock in osutils.walkdirs('./1dir', '1dir'):
805
result.append(dirblock)
806
self.assertEqual(expected_dirblocks[1:],
807
[(dirinfo, [line[0:3] for line in block]) for dirinfo, block in result])
809
def _filter_out_stat(self, result):
810
"""Filter out the stat value from the walkdirs result"""
811
for dirdetail, dirblock in result:
813
for info in dirblock:
814
# Ignore info[3] which is the stat
815
new_dirblock.append((info[0], info[1], info[2], info[4]))
816
dirblock[:] = new_dirblock
818
def test_unicode_walkdirs(self):
819
"""Walkdirs should always return unicode paths."""
820
name0 = u'0file-\xb6'
821
name1 = u'1dir-\u062c\u0648'
822
name2 = u'2file-\u0633'
827
name1 + '/' + name1 + '/',
831
self.build_tree(tree)
833
raise TestSkipped('Could not represent Unicode chars'
834
' in current encoding.')
835
expected_dirblocks = [
837
[(name0, name0, 'file', './' + name0),
838
(name1, name1, 'directory', './' + name1),
839
(name2, name2, 'file', './' + name2),
842
((name1, './' + name1),
843
[(name1 + '/' + name0, name0, 'file', './' + name1
845
(name1 + '/' + name1, name1, 'directory', './' + name1
849
((name1 + '/' + name1, './' + name1 + '/' + name1),
854
result = list(osutils.walkdirs('.'))
855
self._filter_out_stat(result)
856
self.assertEqual(expected_dirblocks, result)
857
result = list(osutils.walkdirs(u'./'+name1, name1))
858
self._filter_out_stat(result)
859
self.assertEqual(expected_dirblocks[1:], result)
861
def test_unicode__walkdirs_utf8(self):
862
"""Walkdirs_utf8 should always return utf8 paths.
864
The abspath portion might be in unicode or utf-8
866
name0 = u'0file-\xb6'
867
name1 = u'1dir-\u062c\u0648'
868
name2 = u'2file-\u0633'
873
name1 + '/' + name1 + '/',
877
self.build_tree(tree)
879
raise TestSkipped('Could not represent Unicode chars'
880
' in current encoding.')
881
name0 = name0.encode('utf8')
882
name1 = name1.encode('utf8')
883
name2 = name2.encode('utf8')
885
expected_dirblocks = [
887
[(name0, name0, 'file', './' + name0),
888
(name1, name1, 'directory', './' + name1),
889
(name2, name2, 'file', './' + name2),
892
((name1, './' + name1),
893
[(name1 + '/' + name0, name0, 'file', './' + name1
895
(name1 + '/' + name1, name1, 'directory', './' + name1
899
((name1 + '/' + name1, './' + name1 + '/' + name1),
905
# For ease in testing, if walkdirs_utf8 returns Unicode, assert that
906
# all abspaths are Unicode, and encode them back into utf8.
907
for dirdetail, dirblock in osutils._walkdirs_utf8('.'):
908
self.assertIsInstance(dirdetail[0], str)
909
if isinstance(dirdetail[1], unicode):
910
dirdetail = (dirdetail[0], dirdetail[1].encode('utf8'))
911
dirblock = [list(info) for info in dirblock]
912
for info in dirblock:
913
self.assertIsInstance(info[4], unicode)
914
info[4] = info[4].encode('utf8')
916
for info in dirblock:
917
self.assertIsInstance(info[0], str)
918
self.assertIsInstance(info[1], str)
919
self.assertIsInstance(info[4], str)
920
# Remove the stat information
921
new_dirblock.append((info[0], info[1], info[2], info[4]))
922
result.append((dirdetail, new_dirblock))
923
self.assertEqual(expected_dirblocks, result)
925
def test_unicode__walkdirs_unicode_to_utf8(self):
926
"""walkdirs_unicode_to_utf8 should be a safe fallback everywhere
928
The abspath portion should be in unicode
930
name0u = u'0file-\xb6'
931
name1u = u'1dir-\u062c\u0648'
932
name2u = u'2file-\u0633'
936
name1u + '/' + name0u,
937
name1u + '/' + name1u + '/',
941
self.build_tree(tree)
943
raise TestSkipped('Could not represent Unicode chars'
944
' in current encoding.')
945
name0 = name0u.encode('utf8')
946
name1 = name1u.encode('utf8')
947
name2 = name2u.encode('utf8')
949
# All of the abspaths should be in unicode, all of the relative paths
951
expected_dirblocks = [
953
[(name0, name0, 'file', './' + name0u),
954
(name1, name1, 'directory', './' + name1u),
955
(name2, name2, 'file', './' + name2u),
958
((name1, './' + name1u),
959
[(name1 + '/' + name0, name0, 'file', './' + name1u
961
(name1 + '/' + name1, name1, 'directory', './' + name1u
965
((name1 + '/' + name1, './' + name1u + '/' + name1u),
970
result = list(osutils._walkdirs_unicode_to_utf8('.'))
971
self._filter_out_stat(result)
972
self.assertEqual(expected_dirblocks, result)
974
def assertPathCompare(self, path_less, path_greater):
975
"""check that path_less and path_greater compare correctly."""
976
self.assertEqual(0, osutils.compare_paths_prefix_order(
977
path_less, path_less))
978
self.assertEqual(0, osutils.compare_paths_prefix_order(
979
path_greater, path_greater))
980
self.assertEqual(-1, osutils.compare_paths_prefix_order(
981
path_less, path_greater))
982
self.assertEqual(1, osutils.compare_paths_prefix_order(
983
path_greater, path_less))
985
def test_compare_paths_prefix_order(self):
986
# root before all else
987
self.assertPathCompare("/", "/a")
989
self.assertPathCompare("/a", "/b")
990
self.assertPathCompare("/b", "/z")
991
# high dirs before lower.
992
self.assertPathCompare("/z", "/a/a")
993
# except if the deeper dir should be output first
994
self.assertPathCompare("/a/b/c", "/d/g")
995
# lexical betwen dirs of the same height
996
self.assertPathCompare("/a/z", "/z/z")
997
self.assertPathCompare("/a/c/z", "/a/d/e")
999
# this should also be consistent for no leading / paths
1000
# root before all else
1001
self.assertPathCompare("", "a")
1002
# alpha within a dir
1003
self.assertPathCompare("a", "b")
1004
self.assertPathCompare("b", "z")
1005
# high dirs before lower.
1006
self.assertPathCompare("z", "a/a")
1007
# except if the deeper dir should be output first
1008
self.assertPathCompare("a/b/c", "d/g")
1009
# lexical betwen dirs of the same height
1010
self.assertPathCompare("a/z", "z/z")
1011
self.assertPathCompare("a/c/z", "a/d/e")
1013
def test_path_prefix_sorting(self):
1014
"""Doing a sort on path prefix should match our sample data."""
1029
dir_sorted_paths = [
1045
sorted(original_paths, key=osutils.path_prefix_key))
1046
# using the comparison routine shoudl work too:
1049
sorted(original_paths, cmp=osutils.compare_paths_prefix_order))
1052
class TestCopyTree(TestCaseInTempDir):
1054
def test_copy_basic_tree(self):
1055
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1056
osutils.copy_tree('source', 'target')
1057
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1058
self.assertEqual(['c'], os.listdir('target/b'))
1060
def test_copy_tree_target_exists(self):
1061
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c',
1063
osutils.copy_tree('source', 'target')
1064
self.assertEqual(['a', 'b'], sorted(os.listdir('target')))
1065
self.assertEqual(['c'], os.listdir('target/b'))
1067
def test_copy_tree_symlinks(self):
1068
self.requireFeature(SymlinkFeature)
1069
self.build_tree(['source/'])
1070
os.symlink('a/generic/path', 'source/lnk')
1071
osutils.copy_tree('source', 'target')
1072
self.assertEqual(['lnk'], os.listdir('target'))
1073
self.assertEqual('a/generic/path', os.readlink('target/lnk'))
1075
def test_copy_tree_handlers(self):
1076
processed_files = []
1077
processed_links = []
1078
def file_handler(from_path, to_path):
1079
processed_files.append(('f', from_path, to_path))
1080
def dir_handler(from_path, to_path):
1081
processed_files.append(('d', from_path, to_path))
1082
def link_handler(from_path, to_path):
1083
processed_links.append((from_path, to_path))
1084
handlers = {'file':file_handler,
1085
'directory':dir_handler,
1086
'symlink':link_handler,
1089
self.build_tree(['source/', 'source/a', 'source/b/', 'source/b/c'])
1090
if osutils.has_symlinks():
1091
os.symlink('a/generic/path', 'source/lnk')
1092
osutils.copy_tree('source', 'target', handlers=handlers)
1094
self.assertEqual([('d', 'source', 'target'),
1095
('f', 'source/a', 'target/a'),
1096
('d', 'source/b', 'target/b'),
1097
('f', 'source/b/c', 'target/b/c'),
1099
self.failIfExists('target')
1100
if osutils.has_symlinks():
1101
self.assertEqual([('source/lnk', 'target/lnk')], processed_links)
1104
#class TestTerminalEncoding has been moved to test_osutils_encodings.py
1105
# [bialix] 2006/12/26
1108
class TestSetUnsetEnv(TestCase):
1109
"""Test updating the environment"""
1112
super(TestSetUnsetEnv, self).setUp()
1114
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'),
1115
'Environment was not cleaned up properly.'
1116
' Variable BZR_TEST_ENV_VAR should not exist.')
1118
if 'BZR_TEST_ENV_VAR' in os.environ:
1119
del os.environ['BZR_TEST_ENV_VAR']
1121
self.addCleanup(cleanup)
1124
"""Test that we can set an env variable"""
1125
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1126
self.assertEqual(None, old)
1127
self.assertEqual('foo', os.environ.get('BZR_TEST_ENV_VAR'))
1129
def test_double_set(self):
1130
"""Test that we get the old value out"""
1131
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1132
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'bar')
1133
self.assertEqual('foo', old)
1134
self.assertEqual('bar', os.environ.get('BZR_TEST_ENV_VAR'))
1136
def test_unicode(self):
1137
"""Environment can only contain plain strings
1139
So Unicode strings must be encoded.
1141
uni_val, env_val = probe_unicode_in_user_encoding()
1143
raise TestSkipped('Cannot find a unicode character that works in'
1144
' encoding %s' % (bzrlib.user_encoding,))
1146
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', uni_val)
1147
self.assertEqual(env_val, os.environ.get('BZR_TEST_ENV_VAR'))
1149
def test_unset(self):
1150
"""Test that passing None will remove the env var"""
1151
osutils.set_or_unset_env('BZR_TEST_ENV_VAR', 'foo')
1152
old = osutils.set_or_unset_env('BZR_TEST_ENV_VAR', None)
1153
self.assertEqual('foo', old)
1154
self.assertEqual(None, os.environ.get('BZR_TEST_ENV_VAR'))
1155
self.failIf('BZR_TEST_ENV_VAR' in os.environ)
1158
class TestLocalTimeOffset(TestCase):
1160
def test_local_time_offset(self):
1161
"""Test that local_time_offset() returns a sane value."""
1162
offset = osutils.local_time_offset()
1163
self.assertTrue(isinstance(offset, int))
1164
# Test that the offset is no more than a eighteen hours in
1166
# Time zone handling is system specific, so it is difficult to
1167
# do more specific tests, but a value outside of this range is
1169
eighteen_hours = 18 * 3600
1170
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1172
def test_local_time_offset_with_timestamp(self):
1173
"""Test that local_time_offset() works with a timestamp."""
1174
offset = osutils.local_time_offset(1000000000.1234567)
1175
self.assertTrue(isinstance(offset, int))
1176
eighteen_hours = 18 * 3600
1177
self.assertTrue(-eighteen_hours < offset < eighteen_hours)
1180
class TestShaFileByName(TestCaseInTempDir):
1182
def test_sha_empty(self):
1183
self.build_tree_contents([('foo', '')])
1184
expected_sha = osutils.sha_string('')
1185
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1187
def test_sha_mixed_endings(self):
1188
text = 'test\r\nwith\nall\rpossible line endings\r\n'
1189
self.build_tree_contents([('foo', text)])
1190
expected_sha = osutils.sha_string(text)
1191
self.assertEqual(expected_sha, osutils.sha_file_by_name('foo'))
1195
r'''# Copyright (C) 2005, 2006 Canonical Ltd
1197
# This program is free software; you can redistribute it and/or modify
1198
# it under the terms of the GNU General Public License as published by
1199
# the Free Software Foundation; either version 2 of the License, or
1200
# (at your option) any later version.
1202
# This program is distributed in the hope that it will be useful,
1203
# but WITHOUT ANY WARRANTY; without even the implied warranty of
1204
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1205
# GNU General Public License for more details.
1207
# You should have received a copy of the GNU General Public License
1208
# along with this program; if not, write to the Free Software
1209
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
1212
# NOTE: If update these, please also update the help for global-options in
1213
# bzrlib/help_topics/__init__.py
1216
"""Set of flags that enable different debug behaviour.
1218
These are set with eg ``-Dlock`` on the bzr command line.
1222
* auth - show authentication sections used
1223
* error - show stack traces for all top level exceptions
1224
* evil - capture call sites that do expensive or badly-scaling operations.
1225
* fetch - trace history copying between repositories
1226
* graph - trace graph traversal information
1227
* hashcache - log every time a working file is read to determine its hash
1228
* hooks - trace hook execution
1229
* hpss - trace smart protocol requests and responses
1230
* http - trace http connections, requests and responses
1231
* index - trace major index operations
1232
* knit - trace knit operations
1233
* lock - trace when lockdir locks are taken or released
1234
* merge - emit information for debugging merges
1235
* pack - emit information about pack operations
1241
class TestResourceLoading(TestCaseInTempDir):
1243
def test_resource_string(self):
1244
# test resource in bzrlib
1245
text = osutils.resource_string('bzrlib', 'debug.py')
1246
self.assertEquals(_debug_text, text)
1247
# test resource under bzrlib
1248
text = osutils.resource_string('bzrlib.ui', 'text.py')
1249
self.assertContainsRe(text, "class TextUIFactory")
1250
# test unsupported package
1251
self.assertRaises(errors.BzrError, osutils.resource_string, 'zzzz',
1253
# test unknown resource
1254
self.assertRaises(IOError, osutils.resource_string, 'bzrlib', 'yyy.xx')