~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-11-17 03:20:35 UTC
  • mfrom: (4792.4.3 456036)
  • Revision ID: pqm@pqm.ubuntu.com-20091117032035-s3sgtlixj1lrminn
(Gordon Tyler) Fix IndexError during 'bzr ignore /' (#456036)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
46
46
import tempfile
47
47
import threading
48
48
import time
49
 
import traceback
50
49
import unittest
51
50
import warnings
52
51
 
53
 
import testtools
54
 
# nb: check this before importing anything else from within it
55
 
_testtools_version = getattr(testtools, '__version__', ())
56
 
if _testtools_version < (0, 9, 2):
57
 
    raise ImportError("need at least testtools 0.9.2: %s is %r"
58
 
        % (testtools.__file__, _testtools_version))
59
 
from testtools import content
60
52
 
61
53
from bzrlib import (
62
54
    branchbuilder,
96
88
from bzrlib.symbol_versioning import (
97
89
    DEPRECATED_PARAMETER,
98
90
    deprecated_function,
99
 
    deprecated_in,
100
91
    deprecated_method,
101
92
    deprecated_passed,
102
93
    )
237
228
                '%d non-main threads were left active in the end.\n'
238
229
                % (TestCase._active_threads - 1))
239
230
 
240
 
    def _extractBenchmarkTime(self, testCase, details=None):
 
231
    def _extractBenchmarkTime(self, testCase):
241
232
        """Add a benchmark time for the current test case."""
242
 
        if details and 'benchtime' in details:
243
 
            return float(''.join(details['benchtime'].iter_bytes()))
244
233
        return getattr(testCase, "_benchtime", None)
245
234
 
246
235
    def _elapsedTestTimeString(self):
280
269
        else:
281
270
            bzr_path = sys.executable
282
271
        self.stream.write(
283
 
            'bzr selftest: %s\n' % (bzr_path,))
 
272
            'testing: %s\n' % (bzr_path,))
284
273
        self.stream.write(
285
274
            '   %s\n' % (
286
275
                    bzrlib.__path__[0],))
331
320
            self.stop()
332
321
        self._cleanupLogFile(test)
333
322
 
334
 
    def addSuccess(self, test, details=None):
 
323
    def addSuccess(self, test):
335
324
        """Tell result that test completed successfully.
336
325
 
337
326
        Called from the TestCase run()
338
327
        """
339
328
        if self._bench_history is not None:
340
 
            benchmark_time = self._extractBenchmarkTime(test, details)
 
329
            benchmark_time = self._extractBenchmarkTime(test)
341
330
            if benchmark_time is not None:
342
331
                self._bench_history.write("%s %s\n" % (
343
332
                    self._formatTime(benchmark_time),
373
362
        self.not_applicable_count += 1
374
363
        self.report_not_applicable(test, reason)
375
364
 
 
365
    def printErrorList(self, flavour, errors):
 
366
        for test, err in errors:
 
367
            self.stream.writeln(self.separator1)
 
368
            self.stream.write("%s: " % flavour)
 
369
            self.stream.writeln(self.getDescription(test))
 
370
            if getattr(test, '_get_log', None) is not None:
 
371
                log_contents = test._get_log()
 
372
                if log_contents:
 
373
                    self.stream.write('\n')
 
374
                    self.stream.write(
 
375
                            ('vvvv[log from %s]' % test.id()).ljust(78,'-'))
 
376
                    self.stream.write('\n')
 
377
                    self.stream.write(log_contents)
 
378
                    self.stream.write('\n')
 
379
                    self.stream.write(
 
380
                            ('^^^^[log from %s]' % test.id()).ljust(78,'-'))
 
381
                    self.stream.write('\n')
 
382
            self.stream.writeln(self.separator2)
 
383
            self.stream.writeln("%s" % err)
 
384
 
376
385
    def _post_mortem(self):
377
386
        """Start a PDB post mortem session."""
378
387
        if os.environ.get('BZR_TEST_PDB', None):
458
467
            a += '%dm%ds' % (runtime / 60, runtime % 60)
459
468
        else:
460
469
            a += '%ds' % runtime
461
 
        total_fail_count = self.error_count + self.failure_count
462
 
        if total_fail_count:
463
 
            a += ', %d failed' % total_fail_count
 
470
        if self.error_count:
 
471
            a += ', %d err' % self.error_count
 
472
        if self.failure_count:
 
473
            a += ', %d fail' % self.failure_count
464
474
        # if self.unsupported:
465
475
        #     a += ', %d missing' % len(self.unsupported)
466
476
        a += ']'
489
499
            ))
490
500
 
491
501
    def report_known_failure(self, test, err):
492
 
        pass
 
502
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
 
503
            self._test_description(test), err[1]))
493
504
 
494
505
    def report_skip(self, test, reason):
495
506
        pass
522
533
    def report_test_start(self, test):
523
534
        self.count += 1
524
535
        name = self._shortened_test_description(test)
525
 
        width = osutils.terminal_width()
526
 
        if width is not None:
527
 
            # width needs space for 6 char status, plus 1 for slash, plus an
528
 
            # 11-char time string, plus a trailing blank
529
 
            # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on
530
 
            # space
531
 
            self.stream.write(self._ellipsize_to_right(name, width-18))
532
 
        else:
533
 
            self.stream.write(name)
 
536
        # width needs space for 6 char status, plus 1 for slash, plus an
 
537
        # 11-char time string, plus a trailing blank
 
538
        # when NUMBERED_DIRS: plus 5 chars on test number, plus 1 char on space
 
539
        self.stream.write(self._ellipsize_to_right(name,
 
540
                          osutils.terminal_width()-18))
534
541
        self.stream.flush()
535
542
 
536
543
    def _error_summary(self, err):
593
600
            applied left to right - the first element in the list is the 
594
601
            innermost decorator.
595
602
        """
596
 
        # stream may know claim to know to write unicode strings, but in older
597
 
        # pythons this goes sufficiently wrong that it is a bad idea. (
598
 
        # specifically a built in file with encoding 'UTF-8' will still try
599
 
        # to encode using ascii.
600
 
        new_encoding = osutils.get_terminal_encoding()
601
 
        codec = codecs.lookup(new_encoding)
602
 
        if type(codec) is tuple:
603
 
            # Python 2.4
604
 
            encode = codec[0]
605
 
        else:
606
 
            encode = codec.encode
607
 
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
608
 
        stream.encoding = new_encoding
609
603
        self.stream = unittest._WritelnDecorator(stream)
610
604
        self.descriptions = descriptions
611
605
        self.verbosity = verbosity
631
625
        for decorator in self._result_decorators:
632
626
            result = decorator(result)
633
627
            result.stop_early = self.stop_on_failure
 
628
        try:
 
629
            import testtools
 
630
        except ImportError:
 
631
            pass
 
632
        else:
 
633
            if isinstance(test, testtools.ConcurrentTestSuite):
 
634
                # We need to catch bzr specific behaviors
 
635
                result = BZRTransformingResult(result)
634
636
        result.startTestRun()
635
637
        try:
636
638
            test.run(result)
654
656
                        % (type(suite), suite))
655
657
 
656
658
 
657
 
TestSkipped = testtools.testcase.TestSkipped
 
659
class TestSkipped(Exception):
 
660
    """Indicates that a test was intentionally skipped, rather than failing."""
658
661
 
659
662
 
660
663
class TestNotApplicable(TestSkipped):
666
669
    """
667
670
 
668
671
 
669
 
# traceback._some_str fails to format exceptions that have the default
670
 
# __str__ which does an implicit ascii conversion. However, repr() on those
671
 
# objects works, for all that its not quite what the doctor may have ordered.
672
 
def _clever_some_str(value):
673
 
    try:
674
 
        return str(value)
675
 
    except:
676
 
        try:
677
 
            return repr(value).replace('\\n', '\n')
678
 
        except:
679
 
            return '<unprintable %s object>' % type(value).__name__
680
 
 
681
 
traceback._some_str = _clever_some_str
682
 
 
683
 
 
684
 
# deprecated - use self.knownFailure(), or self.expectFailure.
685
 
KnownFailure = testtools.testcase._ExpectedFailure
 
672
class KnownFailure(AssertionError):
 
673
    """Indicates that a test failed in a precisely expected manner.
 
674
 
 
675
    Such failures dont block the whole test suite from passing because they are
 
676
    indicators of partially completed code or of future work. We have an
 
677
    explicit error for them so that we can ensure that they are always visible:
 
678
    KnownFailures are always shown in the output of bzr selftest.
 
679
    """
686
680
 
687
681
 
688
682
class UnavailableFeature(Exception):
764
758
        return NullProgressView()
765
759
 
766
760
 
767
 
class TestCase(testtools.TestCase):
 
761
class TestCase(unittest.TestCase):
768
762
    """Base class for bzr unit tests.
769
763
 
770
764
    Tests that need access to disk resources should subclass
789
783
    _leaking_threads_tests = 0
790
784
    _first_thread_leaker_id = None
791
785
    _log_file_name = None
 
786
    _log_contents = ''
 
787
    _keep_log_file = False
792
788
    # record lsprof data when performing benchmark calls.
793
789
    _gather_lsprof_in_benchmarks = False
 
790
    attrs_to_keep = ('id', '_testMethodName', '_testMethodDoc',
 
791
                     '_log_contents', '_log_file_name', '_benchtime',
 
792
                     '_TestCase__testMethodName', '_TestCase__testMethodDoc',)
794
793
 
795
794
    def __init__(self, methodName='testMethod'):
796
795
        super(TestCase, self).__init__(methodName)
797
796
        self._cleanups = []
 
797
        self._bzr_test_setUp_run = False
 
798
        self._bzr_test_tearDown_run = False
798
799
        self._directory_isolation = True
799
 
        self.exception_handlers.insert(0,
800
 
            (UnavailableFeature, self._do_unsupported_or_skip))
801
 
        self.exception_handlers.insert(0,
802
 
            (TestNotApplicable, self._do_not_applicable))
803
800
 
804
801
    def setUp(self):
805
 
        super(TestCase, self).setUp()
806
 
        for feature in getattr(self, '_test_needs_features', []):
807
 
            self.requireFeature(feature)
808
 
        self._log_contents = None
809
 
        self.addDetail("log", content.Content(content.ContentType("text",
810
 
            "plain", {"charset": "utf8"}),
811
 
            lambda:[self._get_log(keep_log_file=True)]))
 
802
        unittest.TestCase.setUp(self)
 
803
        self._bzr_test_setUp_run = True
812
804
        self._cleanEnvironment()
813
805
        self._silenceUI()
814
806
        self._startLogFile()
1017
1009
        server's urls to be used.
1018
1010
        """
1019
1011
        if backing_server is None:
1020
 
            transport_server.start_server()
 
1012
            transport_server.setUp()
1021
1013
        else:
1022
 
            transport_server.start_server(backing_server)
1023
 
        self.addCleanup(transport_server.stop_server)
 
1014
            transport_server.setUp(backing_server)
 
1015
        self.addCleanup(transport_server.tearDown)
1024
1016
        # Obtain a real transport because if the server supplies a password, it
1025
1017
        # will be hidden from the base on the client side.
1026
1018
        t = get_transport(transport_server.get_url())
1123
1115
        :raises AssertionError: If the expected and actual stat values differ
1124
1116
            other than by atime.
1125
1117
        """
1126
 
        self.assertEqual(expected.st_size, actual.st_size,
1127
 
                         'st_size did not match')
1128
 
        self.assertEqual(expected.st_mtime, actual.st_mtime,
1129
 
                         'st_mtime did not match')
1130
 
        self.assertEqual(expected.st_ctime, actual.st_ctime,
1131
 
                         'st_ctime did not match')
1132
 
        if sys.platform != 'win32':
1133
 
            # On Win32 both 'dev' and 'ino' cannot be trusted. In python2.4 it
1134
 
            # is 'dev' that varies, in python 2.5 (6?) it is st_ino that is
1135
 
            # odd. Regardless we shouldn't actually try to assert anything
1136
 
            # about their values
1137
 
            self.assertEqual(expected.st_dev, actual.st_dev,
1138
 
                             'st_dev did not match')
1139
 
            self.assertEqual(expected.st_ino, actual.st_ino,
1140
 
                             'st_ino did not match')
1141
 
        self.assertEqual(expected.st_mode, actual.st_mode,
1142
 
                         'st_mode did not match')
 
1118
        self.assertEqual(expected.st_size, actual.st_size)
 
1119
        self.assertEqual(expected.st_mtime, actual.st_mtime)
 
1120
        self.assertEqual(expected.st_ctime, actual.st_ctime)
 
1121
        self.assertEqual(expected.st_dev, actual.st_dev)
 
1122
        self.assertEqual(expected.st_ino, actual.st_ino)
 
1123
        self.assertEqual(expected.st_mode, actual.st_mode)
1143
1124
 
1144
1125
    def assertLength(self, length, obj_with_len):
1145
1126
        """Assert that obj_with_len is of length length."""
1293
1274
                m += ": " + msg
1294
1275
            self.fail(m)
1295
1276
 
 
1277
    def expectFailure(self, reason, assertion, *args, **kwargs):
 
1278
        """Invoke a test, expecting it to fail for the given reason.
 
1279
 
 
1280
        This is for assertions that ought to succeed, but currently fail.
 
1281
        (The failure is *expected* but not *wanted*.)  Please be very precise
 
1282
        about the failure you're expecting.  If a new bug is introduced,
 
1283
        AssertionError should be raised, not KnownFailure.
 
1284
 
 
1285
        Frequently, expectFailure should be followed by an opposite assertion.
 
1286
        See example below.
 
1287
 
 
1288
        Intended to be used with a callable that raises AssertionError as the
 
1289
        'assertion' parameter.  args and kwargs are passed to the 'assertion'.
 
1290
 
 
1291
        Raises KnownFailure if the test fails.  Raises AssertionError if the
 
1292
        test succeeds.
 
1293
 
 
1294
        example usage::
 
1295
 
 
1296
          self.expectFailure('Math is broken', self.assertNotEqual, 54,
 
1297
                             dynamic_val)
 
1298
          self.assertEqual(42, dynamic_val)
 
1299
 
 
1300
          This means that a dynamic_val of 54 will cause the test to raise
 
1301
          a KnownFailure.  Once math is fixed and the expectFailure is removed,
 
1302
          only a dynamic_val of 42 will allow the test to pass.  Anything other
 
1303
          than 54 or 42 will cause an AssertionError.
 
1304
        """
 
1305
        try:
 
1306
            assertion(*args, **kwargs)
 
1307
        except AssertionError:
 
1308
            raise KnownFailure(reason)
 
1309
        else:
 
1310
            self.fail('Unexpected success.  Should have failed: %s' % reason)
 
1311
 
1296
1312
    def assertFileEqual(self, content, path):
1297
1313
        """Fail if path does not contain 'content'."""
1298
1314
        self.failUnlessExists(path)
1448
1464
 
1449
1465
        Close the file and delete it, unless setKeepLogfile was called.
1450
1466
        """
1451
 
        if bzrlib.trace._trace_file:
1452
 
            # flush the log file, to get all content
1453
 
            bzrlib.trace._trace_file.flush()
 
1467
        if self._log_file is None:
 
1468
            return
1454
1469
        bzrlib.trace.pop_log_file(self._log_memento)
1455
 
        # Cache the log result and delete the file on disk
1456
 
        self._get_log(False)
 
1470
        self._log_file.close()
 
1471
        self._log_file = None
 
1472
        if not self._keep_log_file:
 
1473
            os.remove(self._log_file_name)
 
1474
            self._log_file_name = None
 
1475
 
 
1476
    def setKeepLogfile(self):
 
1477
        """Make the logfile not be deleted when _finishLogFile is called."""
 
1478
        self._keep_log_file = True
1457
1479
 
1458
1480
    def thisFailsStrictLockCheck(self):
1459
1481
        """It is known that this test would fail with -Dstrict_locks.
1491
1513
            'BZR_PROGRESS_BAR': None,
1492
1514
            'BZR_LOG': None,
1493
1515
            'BZR_PLUGIN_PATH': None,
1494
 
            'BZR_CONCURRENCY': None,
1495
1516
            # Make sure that any text ui tests are consistent regardless of
1496
1517
            # the environment the test case is run in; you may want tests that
1497
1518
            # test other combinations.  'dumb' is a reasonable guess for tests
1499
1520
            'TERM': 'dumb',
1500
1521
            'LINES': '25',
1501
1522
            'COLUMNS': '80',
1502
 
            'BZR_COLUMNS': '80',
1503
1523
            # SSH Agent
1504
1524
            'SSH_AUTH_SOCK': None,
1505
1525
            # Proxies
1550
1570
        else:
1551
1571
            addSkip(self, reason)
1552
1572
 
1553
 
    @staticmethod
1554
 
    def _do_known_failure(self, result, e):
 
1573
    def _do_known_failure(self, result):
1555
1574
        err = sys.exc_info()
1556
1575
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1557
1576
        if addExpectedFailure is not None:
1559
1578
        else:
1560
1579
            result.addSuccess(self)
1561
1580
 
1562
 
    @staticmethod
1563
1581
    def _do_not_applicable(self, result, e):
1564
1582
        if not e.args:
1565
1583
            reason = 'No reason given'
1571
1589
        else:
1572
1590
            self._do_skip(result, reason)
1573
1591
 
1574
 
    @staticmethod
1575
 
    def _do_unsupported_or_skip(self, result, e):
1576
 
        reason = e.args[0]
 
1592
    def _do_unsupported_or_skip(self, result, reason):
1577
1593
        addNotSupported = getattr(result, 'addNotSupported', None)
1578
1594
        if addNotSupported is not None:
1579
1595
            result.addNotSupported(self, reason)
1580
1596
        else:
1581
1597
            self._do_skip(result, reason)
1582
1598
 
 
1599
    def run(self, result=None):
 
1600
        if result is None: result = self.defaultTestResult()
 
1601
        result.startTest(self)
 
1602
        try:
 
1603
            self._run(result)
 
1604
            return result
 
1605
        finally:
 
1606
            result.stopTest(self)
 
1607
 
 
1608
    def _run(self, result):
 
1609
        for feature in getattr(self, '_test_needs_features', []):
 
1610
            if not feature.available():
 
1611
                return self._do_unsupported_or_skip(result, feature)
 
1612
        try:
 
1613
            absent_attr = object()
 
1614
            # Python 2.5
 
1615
            method_name = getattr(self, '_testMethodName', absent_attr)
 
1616
            if method_name is absent_attr:
 
1617
                # Python 2.4
 
1618
                method_name = getattr(self, '_TestCase__testMethodName')
 
1619
            testMethod = getattr(self, method_name)
 
1620
            try:
 
1621
                try:
 
1622
                    self.setUp()
 
1623
                    if not self._bzr_test_setUp_run:
 
1624
                        self.fail(
 
1625
                            "test setUp did not invoke "
 
1626
                            "bzrlib.tests.TestCase's setUp")
 
1627
                except KeyboardInterrupt:
 
1628
                    self._runCleanups()
 
1629
                    raise
 
1630
                except KnownFailure:
 
1631
                    self._do_known_failure(result)
 
1632
                    self.tearDown()
 
1633
                    return
 
1634
                except TestNotApplicable, e:
 
1635
                    self._do_not_applicable(result, e)
 
1636
                    self.tearDown()
 
1637
                    return
 
1638
                except TestSkipped, e:
 
1639
                    self._do_skip(result, e.args[0])
 
1640
                    self.tearDown()
 
1641
                    return result
 
1642
                except UnavailableFeature, e:
 
1643
                    self._do_unsupported_or_skip(result, e.args[0])
 
1644
                    self.tearDown()
 
1645
                    return
 
1646
                except:
 
1647
                    result.addError(self, sys.exc_info())
 
1648
                    self._runCleanups()
 
1649
                    return result
 
1650
 
 
1651
                ok = False
 
1652
                try:
 
1653
                    testMethod()
 
1654
                    ok = True
 
1655
                except KnownFailure:
 
1656
                    self._do_known_failure(result)
 
1657
                except self.failureException:
 
1658
                    result.addFailure(self, sys.exc_info())
 
1659
                except TestNotApplicable, e:
 
1660
                    self._do_not_applicable(result, e)
 
1661
                except TestSkipped, e:
 
1662
                    if not e.args:
 
1663
                        reason = "No reason given."
 
1664
                    else:
 
1665
                        reason = e.args[0]
 
1666
                    self._do_skip(result, reason)
 
1667
                except UnavailableFeature, e:
 
1668
                    self._do_unsupported_or_skip(result, e.args[0])
 
1669
                except KeyboardInterrupt:
 
1670
                    self._runCleanups()
 
1671
                    raise
 
1672
                except:
 
1673
                    result.addError(self, sys.exc_info())
 
1674
 
 
1675
                try:
 
1676
                    self.tearDown()
 
1677
                    if not self._bzr_test_tearDown_run:
 
1678
                        self.fail(
 
1679
                            "test tearDown did not invoke "
 
1680
                            "bzrlib.tests.TestCase's tearDown")
 
1681
                except KeyboardInterrupt:
 
1682
                    self._runCleanups()
 
1683
                    raise
 
1684
                except:
 
1685
                    result.addError(self, sys.exc_info())
 
1686
                    self._runCleanups()
 
1687
                    ok = False
 
1688
                if ok: result.addSuccess(self)
 
1689
                return result
 
1690
            except KeyboardInterrupt:
 
1691
                self._runCleanups()
 
1692
                raise
 
1693
        finally:
 
1694
            saved_attrs = {}
 
1695
            for attr_name in self.attrs_to_keep:
 
1696
                if attr_name in self.__dict__:
 
1697
                    saved_attrs[attr_name] = self.__dict__[attr_name]
 
1698
            self.__dict__ = saved_attrs
 
1699
 
 
1700
    def tearDown(self):
 
1701
        self._runCleanups()
 
1702
        self._log_contents = ''
 
1703
        self._bzr_test_tearDown_run = True
 
1704
        unittest.TestCase.tearDown(self)
 
1705
 
1583
1706
    def time(self, callable, *args, **kwargs):
1584
1707
        """Run callable and accrue the time it takes to the benchmark time.
1585
1708
 
1588
1711
        self._benchcalls.
1589
1712
        """
1590
1713
        if self._benchtime is None:
1591
 
            self.addDetail('benchtime', content.Content(content.ContentType(
1592
 
                "text", "plain"), lambda:[str(self._benchtime)]))
1593
1714
            self._benchtime = 0
1594
1715
        start = time.time()
1595
1716
        try:
1604
1725
        finally:
1605
1726
            self._benchtime += time.time() - start
1606
1727
 
 
1728
    def _runCleanups(self):
 
1729
        """Run registered cleanup functions.
 
1730
 
 
1731
        This should only be called from TestCase.tearDown.
 
1732
        """
 
1733
        # TODO: Perhaps this should keep running cleanups even if
 
1734
        # one of them fails?
 
1735
 
 
1736
        # Actually pop the cleanups from the list so tearDown running
 
1737
        # twice is safe (this happens for skipped tests).
 
1738
        while self._cleanups:
 
1739
            cleanup, args, kwargs = self._cleanups.pop()
 
1740
            cleanup(*args, **kwargs)
 
1741
 
1607
1742
    def log(self, *args):
1608
1743
        mutter(*args)
1609
1744
 
1610
1745
    def _get_log(self, keep_log_file=False):
1611
 
        """Internal helper to get the log from bzrlib.trace for this test.
1612
 
 
1613
 
        Please use self.getDetails, or self.get_log to access this in test case
1614
 
        code.
 
1746
        """Get the log from bzrlib.trace calls from this test.
1615
1747
 
1616
1748
        :param keep_log_file: When True, if the log is still a file on disk
1617
1749
            leave it as a file on disk. When False, if the log is still a file
1619
1751
            self._log_contents.
1620
1752
        :return: A string containing the log.
1621
1753
        """
1622
 
        if self._log_contents is not None:
1623
 
            try:
1624
 
                self._log_contents.decode('utf8')
1625
 
            except UnicodeDecodeError:
1626
 
                unicodestr = self._log_contents.decode('utf8', 'replace')
1627
 
                self._log_contents = unicodestr.encode('utf8')
 
1754
        # flush the log file, to get all content
 
1755
        import bzrlib.trace
 
1756
        if bzrlib.trace._trace_file:
 
1757
            bzrlib.trace._trace_file.flush()
 
1758
        if self._log_contents:
 
1759
            # XXX: this can hardly contain the content flushed above --vila
 
1760
            # 20080128
1628
1761
            return self._log_contents
1629
 
        import bzrlib.trace
1630
 
        if bzrlib.trace._trace_file:
1631
 
            # flush the log file, to get all content
1632
 
            bzrlib.trace._trace_file.flush()
1633
1762
        if self._log_file_name is not None:
1634
1763
            logfile = open(self._log_file_name)
1635
1764
            try:
1636
1765
                log_contents = logfile.read()
1637
1766
            finally:
1638
1767
                logfile.close()
1639
 
            try:
1640
 
                log_contents.decode('utf8')
1641
 
            except UnicodeDecodeError:
1642
 
                unicodestr = log_contents.decode('utf8', 'replace')
1643
 
                log_contents = unicodestr.encode('utf8')
1644
1768
            if not keep_log_file:
1645
 
                self._log_file.close()
1646
 
                self._log_file = None
1647
 
                # Permit multiple calls to get_log until we clean it up in
1648
 
                # finishLogFile
1649
1769
                self._log_contents = log_contents
1650
1770
                try:
1651
1771
                    os.remove(self._log_file_name)
1655
1775
                                             ' %r\n' % self._log_file_name))
1656
1776
                    else:
1657
1777
                        raise
1658
 
                self._log_file_name = None
1659
1778
            return log_contents
1660
1779
        else:
1661
 
            return "No log file content and no log file name."
1662
 
 
1663
 
    def get_log(self):
1664
 
        """Get a unicode string containing the log from bzrlib.trace.
1665
 
 
1666
 
        Undecodable characters are replaced.
1667
 
        """
1668
 
        return u"".join(self.getDetails()['log'].iter_text())
 
1780
            return "DELETED log file to reduce memory footprint"
1669
1781
 
1670
1782
    def requireFeature(self, feature):
1671
1783
        """This test requires a specific feature is available.
1713
1825
            os.chdir(working_dir)
1714
1826
 
1715
1827
        try:
1716
 
            try:
1717
 
                result = self.apply_redirected(ui.ui_factory.stdin,
1718
 
                    stdout, stderr,
1719
 
                    bzrlib.commands.run_bzr_catch_user_errors,
1720
 
                    args)
1721
 
            except KeyboardInterrupt:
1722
 
                # Reraise KeyboardInterrupt with contents of redirected stdout
1723
 
                # and stderr as arguments, for tests which are interested in
1724
 
                # stdout and stderr and are expecting the exception.
1725
 
                out = stdout.getvalue()
1726
 
                err = stderr.getvalue()
1727
 
                if out:
1728
 
                    self.log('output:\n%r', out)
1729
 
                if err:
1730
 
                    self.log('errors:\n%r', err)
1731
 
                raise KeyboardInterrupt(out, err)
 
1828
            result = self.apply_redirected(ui.ui_factory.stdin,
 
1829
                stdout, stderr,
 
1830
                bzrlib.commands.run_bzr_catch_user_errors,
 
1831
                args)
1732
1832
        finally:
1733
1833
            logger.removeHandler(handler)
1734
1834
            ui.ui_factory = old_ui_factory
2282
2382
            # recreate a new one or all the followng tests will fail.
2283
2383
            # If you need to inspect its content uncomment the following line
2284
2384
            # import pdb; pdb.set_trace()
2285
 
            _rmtree_temp_dir(root + '/.bzr', test_id=self.id())
 
2385
            _rmtree_temp_dir(root + '/.bzr')
2286
2386
            self._create_safety_net()
2287
2387
            raise AssertionError('%s/.bzr should not be modified' % root)
2288
2388
 
2364
2464
        return branchbuilder.BranchBuilder(branch=branch)
2365
2465
 
2366
2466
    def overrideEnvironmentForTesting(self):
2367
 
        test_home_dir = self.test_home_dir
2368
 
        if isinstance(test_home_dir, unicode):
2369
 
            test_home_dir = test_home_dir.encode(sys.getfilesystemencoding())
2370
 
        os.environ['HOME'] = test_home_dir
2371
 
        os.environ['BZR_HOME'] = test_home_dir
 
2467
        os.environ['HOME'] = self.test_home_dir
 
2468
        os.environ['BZR_HOME'] = self.test_home_dir
2372
2469
 
2373
2470
    def setUp(self):
2374
2471
        super(TestCaseWithMemoryTransport, self).setUp()
2480
2577
 
2481
2578
    def deleteTestDir(self):
2482
2579
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2483
 
        _rmtree_temp_dir(self.test_base_dir, test_id=self.id())
 
2580
        _rmtree_temp_dir(self.test_base_dir)
2484
2581
 
2485
2582
    def build_tree(self, shape, line_endings='binary', transport=None):
2486
2583
        """Build a test tree according to a pattern.
3090
3187
        if self.randomised:
3091
3188
            return iter(self._tests)
3092
3189
        self.randomised = True
3093
 
        self.stream.write("Randomizing test order using seed %s\n\n" %
 
3190
        self.stream.writeln("Randomizing test order using seed %s\n" %
3094
3191
            (self.actual_seed()))
3095
3192
        # Initialise the random number generator.
3096
3193
        random.seed(self.actual_seed())
3153
3250
    concurrency = osutils.local_concurrency()
3154
3251
    result = []
3155
3252
    from subunit import TestProtocolClient, ProtocolTestCase
3156
 
    from subunit.test_results import AutoTimingTestResultDecorator
3157
3253
    class TestInOtherProcess(ProtocolTestCase):
3158
3254
        # Should be in subunit, I think. RBC.
3159
3255
        def __init__(self, stream, pid):
3182
3278
                sys.stdin.close()
3183
3279
                sys.stdin = None
3184
3280
                stream = os.fdopen(c2pwrite, 'wb', 1)
3185
 
                subunit_result = AutoTimingTestResultDecorator(
 
3281
                subunit_result = BzrAutoTimingTestResultDecorator(
3186
3282
                    TestProtocolClient(stream))
3187
3283
                process_suite.run(subunit_result)
3188
3284
            finally:
3225
3321
        if not os.path.isfile(bzr_path):
3226
3322
            # We are probably installed. Assume sys.argv is the right file
3227
3323
            bzr_path = sys.argv[0]
3228
 
        bzr_path = [bzr_path]
3229
 
        if sys.platform == "win32":
3230
 
            # if we're on windows, we can't execute the bzr script directly
3231
 
            bzr_path = [sys.executable] + bzr_path
3232
3324
        fd, test_list_file_name = tempfile.mkstemp()
3233
3325
        test_list_file = os.fdopen(fd, 'wb', 1)
3234
3326
        for test in process_tests:
3235
3327
            test_list_file.write(test.id() + '\n')
3236
3328
        test_list_file.close()
3237
3329
        try:
3238
 
            argv = bzr_path + ['selftest', '--load-list', test_list_file_name,
 
3330
            argv = [bzr_path, 'selftest', '--load-list', test_list_file_name,
3239
3331
                '--subunit']
3240
3332
            if '--no-plugins' in sys.argv:
3241
3333
                argv.append('--no-plugins')
3280
3372
 
3281
3373
    def addFailure(self, test, err):
3282
3374
        self.result.addFailure(test, err)
3283
 
ForwardingResult = testtools.ExtendedToOriginalDecorator
 
3375
 
 
3376
 
 
3377
class BZRTransformingResult(ForwardingResult):
 
3378
 
 
3379
    def addError(self, test, err):
 
3380
        feature = self._error_looks_like('UnavailableFeature: ', err)
 
3381
        if feature is not None:
 
3382
            self.result.addNotSupported(test, feature)
 
3383
        else:
 
3384
            self.result.addError(test, err)
 
3385
 
 
3386
    def addFailure(self, test, err):
 
3387
        known = self._error_looks_like('KnownFailure: ', err)
 
3388
        if known is not None:
 
3389
            self.result.addExpectedFailure(test,
 
3390
                [KnownFailure, KnownFailure(known), None])
 
3391
        else:
 
3392
            self.result.addFailure(test, err)
 
3393
 
 
3394
    def _error_looks_like(self, prefix, err):
 
3395
        """Deserialize exception and returns the stringify value."""
 
3396
        import subunit
 
3397
        value = None
 
3398
        typ, exc, _ = err
 
3399
        if isinstance(exc, subunit.RemoteException):
 
3400
            # stringify the exception gives access to the remote traceback
 
3401
            # We search the last line for 'prefix'
 
3402
            lines = str(exc).split('\n')
 
3403
            while lines and not lines[-1]:
 
3404
                lines.pop(-1)
 
3405
            if lines:
 
3406
                if lines[-1].startswith(prefix):
 
3407
                    value = lines[-1][len(prefix):]
 
3408
        return value
 
3409
 
 
3410
 
 
3411
try:
 
3412
    from subunit.test_results import AutoTimingTestResultDecorator
 
3413
    # Expected failure should be seen as a success not a failure Once subunit
 
3414
    # provide native support for that, BZRTransformingResult and this class
 
3415
    # will become useless.
 
3416
    class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
 
3417
 
 
3418
        def addExpectedFailure(self, test, err):
 
3419
            self._before_event()
 
3420
            return self._call_maybe("addExpectedFailure", self._degrade_skip,
 
3421
                                    test, err)
 
3422
except ImportError:
 
3423
    # Let's just define a no-op decorator
 
3424
    BzrAutoTimingTestResultDecorator = lambda x:x
3284
3425
 
3285
3426
 
3286
3427
class ProfileResult(ForwardingResult):
3569
3710
        'bzrlib.tests.per_inventory',
3570
3711
        'bzrlib.tests.per_interbranch',
3571
3712
        'bzrlib.tests.per_lock',
3572
 
        'bzrlib.tests.per_merger',
3573
3713
        'bzrlib.tests.per_transport',
3574
3714
        'bzrlib.tests.per_tree',
3575
3715
        'bzrlib.tests.per_pack_repository',
3580
3720
        'bzrlib.tests.per_versionedfile',
3581
3721
        'bzrlib.tests.per_workingtree',
3582
3722
        'bzrlib.tests.test__annotator',
3583
 
        'bzrlib.tests.test__bencode',
3584
3723
        'bzrlib.tests.test__chk_map',
3585
3724
        'bzrlib.tests.test__dirstate_helpers',
3586
3725
        'bzrlib.tests.test__groupcompress',
3594
3733
        'bzrlib.tests.test_api',
3595
3734
        'bzrlib.tests.test_atomicfile',
3596
3735
        'bzrlib.tests.test_bad_files',
 
3736
        'bzrlib.tests.test_bencode',
3597
3737
        'bzrlib.tests.test_bisect_multi',
3598
3738
        'bzrlib.tests.test_branch',
3599
3739
        'bzrlib.tests.test_branchbuilder',
3961
4101
    return new_test
3962
4102
 
3963
4103
 
3964
 
def permute_tests_for_extension(standard_tests, loader, py_module_name,
3965
 
                                ext_module_name):
3966
 
    """Helper for permutating tests against an extension module.
3967
 
 
3968
 
    This is meant to be used inside a modules 'load_tests()' function. It will
3969
 
    create 2 scenarios, and cause all tests in the 'standard_tests' to be run
3970
 
    against both implementations. Setting 'test.module' to the appropriate
3971
 
    module. See bzrlib.tests.test__chk_map.load_tests as an example.
3972
 
 
3973
 
    :param standard_tests: A test suite to permute
3974
 
    :param loader: A TestLoader
3975
 
    :param py_module_name: The python path to a python module that can always
3976
 
        be loaded, and will be considered the 'python' implementation. (eg
3977
 
        'bzrlib._chk_map_py')
3978
 
    :param ext_module_name: The python path to an extension module. If the
3979
 
        module cannot be loaded, a single test will be added, which notes that
3980
 
        the module is not available. If it can be loaded, all standard_tests
3981
 
        will be run against that module.
3982
 
    :return: (suite, feature) suite is a test-suite that has all the permuted
3983
 
        tests. feature is the Feature object that can be used to determine if
3984
 
        the module is available.
3985
 
    """
3986
 
 
3987
 
    py_module = __import__(py_module_name, {}, {}, ['NO_SUCH_ATTRIB'])
3988
 
    scenarios = [
3989
 
        ('python', {'module': py_module}),
3990
 
    ]
3991
 
    suite = loader.suiteClass()
3992
 
    feature = ModuleAvailableFeature(ext_module_name)
3993
 
    if feature.available():
3994
 
        scenarios.append(('C', {'module': feature.module}))
3995
 
    else:
3996
 
        # the compiled module isn't available, so we add a failing test
3997
 
        class FailWithoutFeature(TestCase):
3998
 
            def test_fail(self):
3999
 
                self.requireFeature(feature)
4000
 
        suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
4001
 
    result = multiply_tests(standard_tests, scenarios, suite)
4002
 
    return result, feature
4003
 
 
4004
 
 
4005
 
def _rmtree_temp_dir(dirname, test_id=None):
 
4104
def _rmtree_temp_dir(dirname):
4006
4105
    # If LANG=C we probably have created some bogus paths
4007
4106
    # which rmtree(unicode) will fail to delete
4008
4107
    # so make sure we are using rmtree(str) to delete everything
4020
4119
        # We don't want to fail here because some useful display will be lost
4021
4120
        # otherwise. Polluting the tmp dir is bad, but not giving all the
4022
4121
        # possible info to the test runner is even worse.
4023
 
        if test_id != None:
4024
 
            ui.ui_factory.clear_term()
4025
 
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4026
4122
        sys.stderr.write('Unable to remove testing dir %s\n%s'
4027
4123
                         % (os.path.basename(dirname), e))
4028
4124
 
4112
4208
UnicodeFilenameFeature = _UnicodeFilenameFeature()
4113
4209
 
4114
4210
 
4115
 
class _CompatabilityThunkFeature(Feature):
4116
 
    """This feature is just a thunk to another feature.
4117
 
 
4118
 
    It issues a deprecation warning if it is accessed, to let you know that you
4119
 
    should really use a different feature.
4120
 
    """
4121
 
 
4122
 
    def __init__(self, module, name, this_name, dep_version):
4123
 
        super(_CompatabilityThunkFeature, self).__init__()
4124
 
        self._module = module
4125
 
        self._name = name
4126
 
        self._this_name = this_name
4127
 
        self._dep_version = dep_version
4128
 
        self._feature = None
4129
 
 
4130
 
    def _ensure(self):
4131
 
        if self._feature is None:
4132
 
            msg = (self._dep_version % self._this_name) + (
4133
 
                   ' Use %s.%s instead.' % (self._module, self._name))
4134
 
            symbol_versioning.warn(msg, DeprecationWarning)
4135
 
            mod = __import__(self._module, {}, {}, [self._name])
4136
 
            self._feature = getattr(mod, self._name)
4137
 
 
4138
 
    def _probe(self):
4139
 
        self._ensure()
4140
 
        return self._feature._probe()
4141
 
 
4142
 
 
4143
 
class ModuleAvailableFeature(Feature):
4144
 
    """This is a feature than describes a module we want to be available.
4145
 
 
4146
 
    Declare the name of the module in __init__(), and then after probing, the
4147
 
    module will be available as 'self.module'.
4148
 
 
4149
 
    :ivar module: The module if it is available, else None.
4150
 
    """
4151
 
 
4152
 
    def __init__(self, module_name):
4153
 
        super(ModuleAvailableFeature, self).__init__()
4154
 
        self.module_name = module_name
4155
 
 
4156
 
    def _probe(self):
4157
 
        try:
4158
 
            self._module = __import__(self.module_name, {}, {}, [''])
4159
 
            return True
4160
 
        except ImportError:
4161
 
            return False
4162
 
 
4163
 
    @property
4164
 
    def module(self):
4165
 
        if self.available(): # Make sure the probe has been done
4166
 
            return self._module
4167
 
        return None
4168
 
    
4169
 
    def feature_name(self):
4170
 
        return self.module_name
4171
 
 
4172
 
 
4173
 
# This is kept here for compatibility, it is recommended to use
4174
 
# 'bzrlib.tests.feature.paramiko' instead
4175
 
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4176
 
    'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
4177
 
 
4178
 
 
4179
4211
def probe_unicode_in_user_encoding():
4180
4212
    """Try to encode several unicode strings to use in unicode-aware tests.
4181
4213
    Return first successfull match.
4230
4262
HTTPSServerFeature = _HTTPSServerFeature()
4231
4263
 
4232
4264
 
 
4265
class _ParamikoFeature(Feature):
 
4266
    """Is paramiko available?"""
 
4267
 
 
4268
    def _probe(self):
 
4269
        try:
 
4270
            from bzrlib.transport.sftp import SFTPAbsoluteServer
 
4271
            return True
 
4272
        except errors.ParamikoNotPresent:
 
4273
            return False
 
4274
 
 
4275
    def feature_name(self):
 
4276
        return "Paramiko"
 
4277
 
 
4278
 
 
4279
ParamikoFeature = _ParamikoFeature()
 
4280
 
 
4281
 
4233
4282
class _UnicodeFilename(Feature):
4234
4283
    """Does the filesystem support Unicode filenames?"""
4235
4284
 
4340
4389
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4341
4390
 
4342
4391
 
4343
 
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4344
 
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4345
 
    'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
 
4392
class _SubUnitFeature(Feature):
 
4393
    """Check if subunit is available."""
 
4394
 
 
4395
    def _probe(self):
 
4396
        try:
 
4397
            import subunit
 
4398
            return True
 
4399
        except ImportError:
 
4400
            return False
 
4401
 
 
4402
    def feature_name(self):
 
4403
        return 'subunit'
 
4404
 
 
4405
SubUnitFeature = _SubUnitFeature()
4346
4406
# Only define SubUnitBzrRunner if subunit is available.
4347
4407
try:
4348
4408
    from subunit import TestProtocolClient
4349
 
    from subunit.test_results import AutoTimingTestResultDecorator
4350
4409
    class SubUnitBzrRunner(TextTestRunner):
4351
4410
        def run(self, test):
4352
 
            result = AutoTimingTestResultDecorator(
 
4411
            result = BzrAutoTimingTestResultDecorator(
4353
4412
                TestProtocolClient(self.stream))
4354
4413
            test.run(result)
4355
4414
            return result