~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-11-18 06:18:14 UTC
  • mfrom: (4634.97.5 doc-2.0)
  • Revision ID: pqm@pqm.ubuntu.com-20091118061814-695imx80olc79o7l
(mbp, trivial) additional doc building fix

Show diffs side-by-side

added added

removed removed

Lines of Context:
53
53
from bzrlib import (
54
54
    branchbuilder,
55
55
    bzrdir,
 
56
    chk_map,
 
57
    config,
56
58
    debug,
57
59
    errors,
58
60
    hooks,
90
92
    deprecated_passed,
91
93
    )
92
94
import bzrlib.trace
93
 
from bzrlib.transport import get_transport
 
95
from bzrlib.transport import get_transport, pathfilter
94
96
import bzrlib.transport
95
97
from bzrlib.transport.local import LocalURLServer
96
98
from bzrlib.transport.memory import MemoryServer
221
223
                '%s is leaking threads among %d leaking tests.\n' % (
222
224
                TestCase._first_thread_leaker_id,
223
225
                TestCase._leaking_threads_tests))
 
226
            # We don't report the main thread as an active one.
 
227
            self.stream.write(
 
228
                '%d non-main threads were left active in the end.\n'
 
229
                % (TestCase._active_threads - 1))
224
230
 
225
231
    def _extractBenchmarkTime(self, testCase):
226
232
        """Add a benchmark time for the current test case."""
292
298
        Called from the TestCase run() method when the test
293
299
        fails with an unexpected error.
294
300
        """
295
 
        self._testConcluded(test)
296
 
        if isinstance(err[1], TestNotApplicable):
297
 
            return self._addNotApplicable(test, err)
298
 
        elif isinstance(err[1], UnavailableFeature):
299
 
            return self.addNotSupported(test, err[1].args[0])
300
 
        else:
301
 
            unittest.TestResult.addError(self, test, err)
302
 
            self.error_count += 1
303
 
            self.report_error(test, err)
304
 
            if self.stop_early:
305
 
                self.stop()
306
 
            self._cleanupLogFile(test)
 
301
        self._post_mortem()
 
302
        unittest.TestResult.addError(self, test, err)
 
303
        self.error_count += 1
 
304
        self.report_error(test, err)
 
305
        if self.stop_early:
 
306
            self.stop()
 
307
        self._cleanupLogFile(test)
307
308
 
308
309
    def addFailure(self, test, err):
309
310
        """Tell result that test failed.
311
312
        Called from the TestCase run() method when the test
312
313
        fails because e.g. an assert() method failed.
313
314
        """
314
 
        self._testConcluded(test)
315
 
        if isinstance(err[1], KnownFailure):
316
 
            return self._addKnownFailure(test, err)
317
 
        else:
318
 
            unittest.TestResult.addFailure(self, test, err)
319
 
            self.failure_count += 1
320
 
            self.report_failure(test, err)
321
 
            if self.stop_early:
322
 
                self.stop()
323
 
            self._cleanupLogFile(test)
 
315
        self._post_mortem()
 
316
        unittest.TestResult.addFailure(self, test, err)
 
317
        self.failure_count += 1
 
318
        self.report_failure(test, err)
 
319
        if self.stop_early:
 
320
            self.stop()
 
321
        self._cleanupLogFile(test)
324
322
 
325
323
    def addSuccess(self, test):
326
324
        """Tell result that test completed successfully.
327
325
 
328
326
        Called from the TestCase run()
329
327
        """
330
 
        self._testConcluded(test)
331
328
        if self._bench_history is not None:
332
329
            benchmark_time = self._extractBenchmarkTime(test)
333
330
            if benchmark_time is not None:
339
336
        unittest.TestResult.addSuccess(self, test)
340
337
        test._log_contents = ''
341
338
 
342
 
    def _testConcluded(self, test):
343
 
        """Common code when a test has finished.
344
 
 
345
 
        Called regardless of whether it succeded, failed, etc.
346
 
        """
347
 
        pass
348
 
 
349
 
    def _addKnownFailure(self, test, err):
 
339
    def addExpectedFailure(self, test, err):
350
340
        self.known_failure_count += 1
351
341
        self.report_known_failure(test, err)
352
342
 
354
344
        """The test will not be run because of a missing feature.
355
345
        """
356
346
        # this can be called in two different ways: it may be that the
357
 
        # test started running, and then raised (through addError)
 
347
        # test started running, and then raised (through requireFeature)
358
348
        # UnavailableFeature.  Alternatively this method can be called
359
 
        # while probing for features before running the tests; in that
360
 
        # case we will see startTest and stopTest, but the test will never
361
 
        # actually run.
 
349
        # while probing for features before running the test code proper; in
 
350
        # that case we will see startTest and stopTest, but the test will
 
351
        # never actually run.
362
352
        self.unsupported.setdefault(str(feature), 0)
363
353
        self.unsupported[str(feature)] += 1
364
354
        self.report_unsupported(test, feature)
368
358
        self.skip_count += 1
369
359
        self.report_skip(test, reason)
370
360
 
371
 
    def _addNotApplicable(self, test, skip_excinfo):
372
 
        if isinstance(skip_excinfo[1], TestNotApplicable):
373
 
            self.not_applicable_count += 1
374
 
            self.report_not_applicable(test, skip_excinfo)
375
 
        try:
376
 
            test.tearDown()
377
 
        except KeyboardInterrupt:
378
 
            raise
379
 
        except:
380
 
            self.addError(test, test.exc_info())
381
 
        else:
382
 
            # seems best to treat this as success from point-of-view of unittest
383
 
            # -- it actually does nothing so it barely matters :)
384
 
            unittest.TestResult.addSuccess(self, test)
385
 
            test._log_contents = ''
 
361
    def addNotApplicable(self, test, reason):
 
362
        self.not_applicable_count += 1
 
363
        self.report_not_applicable(test, reason)
386
364
 
387
365
    def printErrorList(self, flavour, errors):
388
366
        for test, err in errors:
404
382
            self.stream.writeln(self.separator2)
405
383
            self.stream.writeln("%s" % err)
406
384
 
 
385
    def _post_mortem(self):
 
386
        """Start a PDB post mortem session."""
 
387
        if os.environ.get('BZR_TEST_PDB', None):
 
388
            import pdb;pdb.post_mortem()
 
389
 
407
390
    def progress(self, offset, whence):
408
391
        """The test is adjusting the count of tests to run."""
409
392
        if whence == SUBUNIT_SEEK_SET:
488
471
            a += ', %d err' % self.error_count
489
472
        if self.failure_count:
490
473
            a += ', %d fail' % self.failure_count
491
 
        if self.unsupported:
492
 
            a += ', %d missing' % len(self.unsupported)
 
474
        # if self.unsupported:
 
475
        #     a += ', %d missing' % len(self.unsupported)
493
476
        a += ']'
494
477
        return a
495
478
 
504
487
        return self._shortened_test_description(test)
505
488
 
506
489
    def report_error(self, test, err):
507
 
        self.pb.note('ERROR: %s\n    %s\n',
 
490
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
508
491
            self._test_description(test),
509
492
            err[1],
510
 
            )
 
493
            ))
511
494
 
512
495
    def report_failure(self, test, err):
513
 
        self.pb.note('FAIL: %s\n    %s\n',
 
496
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
514
497
            self._test_description(test),
515
498
            err[1],
516
 
            )
 
499
            ))
517
500
 
518
501
    def report_known_failure(self, test, err):
519
 
        self.pb.note('XFAIL: %s\n%s\n',
520
 
            self._test_description(test), err[1])
 
502
        ui.ui_factory.note('XFAIL: %s\n%s\n' % (
 
503
            self._test_description(test), err[1]))
521
504
 
522
505
    def report_skip(self, test, reason):
523
506
        pass
524
507
 
525
 
    def report_not_applicable(self, test, skip_excinfo):
 
508
    def report_not_applicable(self, test, reason):
526
509
        pass
527
510
 
528
511
    def report_unsupported(self, test, feature):
589
572
        self.stream.writeln(' SKIP %s\n%s'
590
573
                % (self._testTimeString(test), reason))
591
574
 
592
 
    def report_not_applicable(self, test, skip_excinfo):
593
 
        self.stream.writeln('  N/A %s\n%s'
594
 
                % (self._testTimeString(test),
595
 
                   self._error_summary(skip_excinfo)))
 
575
    def report_not_applicable(self, test, reason):
 
576
        self.stream.writeln('  N/A %s\n    %s'
 
577
                % (self._testTimeString(test), reason))
596
578
 
597
579
    def report_unsupported(self, test, feature):
598
580
        """test cannot be run because feature is missing."""
700
682
class UnavailableFeature(Exception):
701
683
    """A feature required for this test was not available.
702
684
 
 
685
    This can be considered a specialised form of SkippedTest.
 
686
 
703
687
    The feature should be used to construct the exception.
704
688
    """
705
689
 
812
796
        self._cleanups = []
813
797
        self._bzr_test_setUp_run = False
814
798
        self._bzr_test_tearDown_run = False
 
799
        self._directory_isolation = True
815
800
 
816
801
    def setUp(self):
817
802
        unittest.TestCase.setUp(self)
822
807
        self._benchcalls = []
823
808
        self._benchtime = None
824
809
        self._clear_hooks()
 
810
        self._track_transports()
825
811
        self._track_locks()
826
812
        self._clear_debug_flags()
827
813
        TestCase._active_threads = threading.activeCount()
836
822
        active = threading.activeCount()
837
823
        leaked_threads = active - TestCase._active_threads
838
824
        TestCase._active_threads = active
839
 
        if leaked_threads:
 
825
        # If some tests make the number of threads *decrease*, we'll consider
 
826
        # that they are just observing old threads dieing, not agressively kill
 
827
        # random threads. So we don't report these tests as leaking. The risk
 
828
        # is that we have false positives that way (the test see 2 threads
 
829
        # going away but leak one) but it seems less likely than the actual
 
830
        # false positives (the test see threads going away and does not leak).
 
831
        if leaked_threads > 0:
840
832
            TestCase._leaking_threads_tests += 1
841
833
            if TestCase._first_thread_leaker_id is None:
842
834
                TestCase._first_thread_leaker_id = self.id()
868
860
        # this hook should always be installed
869
861
        request._install_hook()
870
862
 
 
863
    def disable_directory_isolation(self):
 
864
        """Turn off directory isolation checks."""
 
865
        self._directory_isolation = False
 
866
 
 
867
    def enable_directory_isolation(self):
 
868
        """Enable directory isolation checks."""
 
869
        self._directory_isolation = True
 
870
 
871
871
    def _silenceUI(self):
872
872
        """Turn off UI for duration of test"""
873
873
        # by default the UI is off; tests can turn it on if they want it.
928
928
    def _lock_broken(self, result):
929
929
        self._lock_actions.append(('broken', result))
930
930
 
 
931
    def permit_dir(self, name):
 
932
        """Permit a directory to be used by this test. See permit_url."""
 
933
        name_transport = get_transport(name)
 
934
        self.permit_url(name)
 
935
        self.permit_url(name_transport.base)
 
936
 
 
937
    def permit_url(self, url):
 
938
        """Declare that url is an ok url to use in this test.
 
939
        
 
940
        Do this for memory transports, temporary test directory etc.
 
941
        
 
942
        Do not do this for the current working directory, /tmp, or any other
 
943
        preexisting non isolated url.
 
944
        """
 
945
        if not url.endswith('/'):
 
946
            url += '/'
 
947
        self._bzr_selftest_roots.append(url)
 
948
 
 
949
    def permit_source_tree_branch_repo(self):
 
950
        """Permit the source tree bzr is running from to be opened.
 
951
 
 
952
        Some code such as bzrlib.version attempts to read from the bzr branch
 
953
        that bzr is executing from (if any). This method permits that directory
 
954
        to be used in the test suite.
 
955
        """
 
956
        path = self.get_source_path()
 
957
        self.record_directory_isolation()
 
958
        try:
 
959
            try:
 
960
                workingtree.WorkingTree.open(path)
 
961
            except (errors.NotBranchError, errors.NoWorkingTree):
 
962
                return
 
963
        finally:
 
964
            self.enable_directory_isolation()
 
965
 
 
966
    def _preopen_isolate_transport(self, transport):
 
967
        """Check that all transport openings are done in the test work area."""
 
968
        while isinstance(transport, pathfilter.PathFilteringTransport):
 
969
            # Unwrap pathfiltered transports
 
970
            transport = transport.server.backing_transport.clone(
 
971
                transport._filter('.'))
 
972
        url = transport.base
 
973
        # ReadonlySmartTCPServer_for_testing decorates the backing transport
 
974
        # urls it is given by prepending readonly+. This is appropriate as the
 
975
        # client shouldn't know that the server is readonly (or not readonly).
 
976
        # We could register all servers twice, with readonly+ prepending, but
 
977
        # that makes for a long list; this is about the same but easier to
 
978
        # read.
 
979
        if url.startswith('readonly+'):
 
980
            url = url[len('readonly+'):]
 
981
        self._preopen_isolate_url(url)
 
982
 
 
983
    def _preopen_isolate_url(self, url):
 
984
        if not self._directory_isolation:
 
985
            return
 
986
        if self._directory_isolation == 'record':
 
987
            self._bzr_selftest_roots.append(url)
 
988
            return
 
989
        # This prevents all transports, including e.g. sftp ones backed on disk
 
990
        # from working unless they are explicitly granted permission. We then
 
991
        # depend on the code that sets up test transports to check that they are
 
992
        # appropriately isolated and enable their use by calling
 
993
        # self.permit_transport()
 
994
        if not osutils.is_inside_any(self._bzr_selftest_roots, url):
 
995
            raise errors.BzrError("Attempt to escape test isolation: %r %r"
 
996
                % (url, self._bzr_selftest_roots))
 
997
 
 
998
    def record_directory_isolation(self):
 
999
        """Gather accessed directories to permit later access.
 
1000
        
 
1001
        This is used for tests that access the branch bzr is running from.
 
1002
        """
 
1003
        self._directory_isolation = "record"
 
1004
 
931
1005
    def start_server(self, transport_server, backing_server=None):
932
1006
        """Start transport_server for this test.
933
1007
 
939
1013
        else:
940
1014
            transport_server.setUp(backing_server)
941
1015
        self.addCleanup(transport_server.tearDown)
 
1016
        # Obtain a real transport because if the server supplies a password, it
 
1017
        # will be hidden from the base on the client side.
 
1018
        t = get_transport(transport_server.get_url())
 
1019
        # Some transport servers effectively chroot the backing transport;
 
1020
        # others like SFTPServer don't - users of the transport can walk up the
 
1021
        # transport to read the entire backing transport. This wouldn't matter
 
1022
        # except that the workdir tests are given - and that they expect the
 
1023
        # server's url to point at - is one directory under the safety net. So
 
1024
        # Branch operations into the transport will attempt to walk up one
 
1025
        # directory. Chrooting all servers would avoid this but also mean that
 
1026
        # we wouldn't be testing directly against non-root urls. Alternatively
 
1027
        # getting the test framework to start the server with a backing server
 
1028
        # at the actual safety net directory would work too, but this then
 
1029
        # means that the self.get_url/self.get_transport methods would need
 
1030
        # to transform all their results. On balance its cleaner to handle it
 
1031
        # here, and permit a higher url when we have one of these transports.
 
1032
        if t.base.endswith('/work/'):
 
1033
            # we have safety net/test root/work
 
1034
            t = t.clone('../..')
 
1035
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1036
            # The smart server adds a path similar to work, which is traversed
 
1037
            # up from by the client. But the server is chrooted - the actual
 
1038
            # backing transport is not escaped from, and VFS requests to the
 
1039
            # root will error (because they try to escape the chroot).
 
1040
            t2 = t.clone('..')
 
1041
            while t2.base != t.base:
 
1042
                t = t2
 
1043
                t2 = t.clone('..')
 
1044
        self.permit_url(t.base)
 
1045
 
 
1046
    def _track_transports(self):
 
1047
        """Install checks for transport usage."""
 
1048
        # TestCase has no safe place it can write to.
 
1049
        self._bzr_selftest_roots = []
 
1050
        # Currently the easiest way to be sure that nothing is going on is to
 
1051
        # hook into bzr dir opening. This leaves a small window of error for
 
1052
        # transport tests, but they are well known, and we can improve on this
 
1053
        # step.
 
1054
        bzrdir.BzrDir.hooks.install_named_hook("pre_open",
 
1055
            self._preopen_isolate_transport, "Check bzr directories are safe.")
942
1056
 
943
1057
    def _ndiff_strings(self, a, b):
944
1058
        """Return ndiff between two strings containing lines.
982
1096
            return
983
1097
        if message is None:
984
1098
            message = "texts not equal:\n"
 
1099
        if a + '\n' == b:
 
1100
            message = 'first string is missing a final newline.\n'
985
1101
        if a == b + '\n':
986
 
            message = 'first string is missing a final newline.\n'
987
 
        if a + '\n' == b:
988
1102
            message = 'second string is missing a final newline.\n'
989
1103
        raise AssertionError(message +
990
1104
                             self._ndiff_strings(a, b))
1014
1128
            self.fail("Incorrect length: wanted %d, got %d for %r" % (
1015
1129
                length, len(obj_with_len), obj_with_len))
1016
1130
 
 
1131
    def assertLogsError(self, exception_class, func, *args, **kwargs):
 
1132
        """Assert that func(*args, **kwargs) quietly logs a specific exception.
 
1133
        """
 
1134
        from bzrlib import trace
 
1135
        captured = []
 
1136
        orig_log_exception_quietly = trace.log_exception_quietly
 
1137
        try:
 
1138
            def capture():
 
1139
                orig_log_exception_quietly()
 
1140
                captured.append(sys.exc_info())
 
1141
            trace.log_exception_quietly = capture
 
1142
            func(*args, **kwargs)
 
1143
        finally:
 
1144
            trace.log_exception_quietly = orig_log_exception_quietly
 
1145
        self.assertLength(1, captured)
 
1146
        err = captured[0][1]
 
1147
        self.assertIsInstance(err, exception_class)
 
1148
        return err
 
1149
 
1017
1150
    def assertPositive(self, val):
1018
1151
        """Assert that val is greater than 0."""
1019
1152
        self.assertTrue(val > 0, 'expected a positive value, but got %s' % val)
1433
1566
    def _do_skip(self, result, reason):
1434
1567
        addSkip = getattr(result, 'addSkip', None)
1435
1568
        if not callable(addSkip):
1436
 
            result.addError(self, sys.exc_info())
 
1569
            result.addSuccess(result)
1437
1570
        else:
1438
1571
            addSkip(self, reason)
1439
1572
 
 
1573
    def _do_known_failure(self, result):
 
1574
        err = sys.exc_info()
 
1575
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
 
1576
        if addExpectedFailure is not None:
 
1577
            addExpectedFailure(self, err)
 
1578
        else:
 
1579
            result.addSuccess(self)
 
1580
 
 
1581
    def _do_not_applicable(self, result, e):
 
1582
        if not e.args:
 
1583
            reason = 'No reason given'
 
1584
        else:
 
1585
            reason = e.args[0]
 
1586
        addNotApplicable = getattr(result, 'addNotApplicable', None)
 
1587
        if addNotApplicable is not None:
 
1588
            result.addNotApplicable(self, reason)
 
1589
        else:
 
1590
            self._do_skip(result, reason)
 
1591
 
 
1592
    def _do_unsupported_or_skip(self, result, reason):
 
1593
        addNotSupported = getattr(result, 'addNotSupported', None)
 
1594
        if addNotSupported is not None:
 
1595
            result.addNotSupported(self, reason)
 
1596
        else:
 
1597
            self._do_skip(result, reason)
 
1598
 
1440
1599
    def run(self, result=None):
1441
1600
        if result is None: result = self.defaultTestResult()
 
1601
        result.startTest(self)
 
1602
        try:
 
1603
            self._run(result)
 
1604
            return result
 
1605
        finally:
 
1606
            result.stopTest(self)
 
1607
 
 
1608
    def _run(self, result):
1442
1609
        for feature in getattr(self, '_test_needs_features', []):
1443
1610
            if not feature.available():
1444
 
                result.startTest(self)
1445
 
                if getattr(result, 'addNotSupported', None):
1446
 
                    result.addNotSupported(self, feature)
1447
 
                else:
1448
 
                    result.addSuccess(self)
1449
 
                result.stopTest(self)
1450
 
                return result
 
1611
                return self._do_unsupported_or_skip(result, feature)
1451
1612
        try:
 
1613
            absent_attr = object()
 
1614
            # Python 2.5
 
1615
            method_name = getattr(self, '_testMethodName', absent_attr)
 
1616
            if method_name is absent_attr:
 
1617
                # Python 2.4
 
1618
                method_name = getattr(self, '_TestCase__testMethodName')
 
1619
            testMethod = getattr(self, method_name)
1452
1620
            try:
1453
 
                result.startTest(self)
1454
 
                absent_attr = object()
1455
 
                # Python 2.5
1456
 
                method_name = getattr(self, '_testMethodName', absent_attr)
1457
 
                if method_name is absent_attr:
1458
 
                    # Python 2.4
1459
 
                    method_name = getattr(self, '_TestCase__testMethodName')
1460
 
                testMethod = getattr(self, method_name)
1461
 
                try:
1462
 
                    try:
1463
 
                        self.setUp()
1464
 
                        if not self._bzr_test_setUp_run:
1465
 
                            self.fail(
1466
 
                                "test setUp did not invoke "
1467
 
                                "bzrlib.tests.TestCase's setUp")
1468
 
                    except KeyboardInterrupt:
1469
 
                        self._runCleanups()
1470
 
                        raise
1471
 
                    except TestSkipped, e:
1472
 
                        self._do_skip(result, e.args[0])
1473
 
                        self.tearDown()
1474
 
                        return result
1475
 
                    except:
1476
 
                        result.addError(self, sys.exc_info())
1477
 
                        self._runCleanups()
1478
 
                        return result
1479
 
 
 
1621
                try:
 
1622
                    self.setUp()
 
1623
                    if not self._bzr_test_setUp_run:
 
1624
                        self.fail(
 
1625
                            "test setUp did not invoke "
 
1626
                            "bzrlib.tests.TestCase's setUp")
 
1627
                except KeyboardInterrupt:
 
1628
                    self._runCleanups()
 
1629
                    raise
 
1630
                except KnownFailure:
 
1631
                    self._do_known_failure(result)
 
1632
                    self.tearDown()
 
1633
                    return
 
1634
                except TestNotApplicable, e:
 
1635
                    self._do_not_applicable(result, e)
 
1636
                    self.tearDown()
 
1637
                    return
 
1638
                except TestSkipped, e:
 
1639
                    self._do_skip(result, e.args[0])
 
1640
                    self.tearDown()
 
1641
                    return result
 
1642
                except UnavailableFeature, e:
 
1643
                    self._do_unsupported_or_skip(result, e.args[0])
 
1644
                    self.tearDown()
 
1645
                    return
 
1646
                except:
 
1647
                    result.addError(self, sys.exc_info())
 
1648
                    self._runCleanups()
 
1649
                    return result
 
1650
 
 
1651
                ok = False
 
1652
                try:
 
1653
                    testMethod()
 
1654
                    ok = True
 
1655
                except KnownFailure:
 
1656
                    self._do_known_failure(result)
 
1657
                except self.failureException:
 
1658
                    result.addFailure(self, sys.exc_info())
 
1659
                except TestNotApplicable, e:
 
1660
                    self._do_not_applicable(result, e)
 
1661
                except TestSkipped, e:
 
1662
                    if not e.args:
 
1663
                        reason = "No reason given."
 
1664
                    else:
 
1665
                        reason = e.args[0]
 
1666
                    self._do_skip(result, reason)
 
1667
                except UnavailableFeature, e:
 
1668
                    self._do_unsupported_or_skip(result, e.args[0])
 
1669
                except KeyboardInterrupt:
 
1670
                    self._runCleanups()
 
1671
                    raise
 
1672
                except:
 
1673
                    result.addError(self, sys.exc_info())
 
1674
 
 
1675
                try:
 
1676
                    self.tearDown()
 
1677
                    if not self._bzr_test_tearDown_run:
 
1678
                        self.fail(
 
1679
                            "test tearDown did not invoke "
 
1680
                            "bzrlib.tests.TestCase's tearDown")
 
1681
                except KeyboardInterrupt:
 
1682
                    self._runCleanups()
 
1683
                    raise
 
1684
                except:
 
1685
                    result.addError(self, sys.exc_info())
 
1686
                    self._runCleanups()
1480
1687
                    ok = False
1481
 
                    try:
1482
 
                        testMethod()
1483
 
                        ok = True
1484
 
                    except self.failureException:
1485
 
                        result.addFailure(self, sys.exc_info())
1486
 
                    except TestSkipped, e:
1487
 
                        if not e.args:
1488
 
                            reason = "No reason given."
1489
 
                        else:
1490
 
                            reason = e.args[0]
1491
 
                        self._do_skip(result, reason)
1492
 
                    except KeyboardInterrupt:
1493
 
                        self._runCleanups()
1494
 
                        raise
1495
 
                    except:
1496
 
                        result.addError(self, sys.exc_info())
1497
 
 
1498
 
                    try:
1499
 
                        self.tearDown()
1500
 
                        if not self._bzr_test_tearDown_run:
1501
 
                            self.fail(
1502
 
                                "test tearDown did not invoke "
1503
 
                                "bzrlib.tests.TestCase's tearDown")
1504
 
                    except KeyboardInterrupt:
1505
 
                        self._runCleanups()
1506
 
                        raise
1507
 
                    except:
1508
 
                        result.addError(self, sys.exc_info())
1509
 
                        self._runCleanups()
1510
 
                        ok = False
1511
 
                    if ok: result.addSuccess(self)
1512
 
                finally:
1513
 
                    result.stopTest(self)
 
1688
                if ok: result.addSuccess(self)
1514
1689
                return result
1515
 
            except TestNotApplicable:
1516
 
                # Not moved from the result [yet].
1517
 
                self._runCleanups()
1518
 
                raise
1519
1690
            except KeyboardInterrupt:
1520
1691
                self._runCleanups()
1521
1692
                raise
1629
1800
 
1630
1801
    def _run_bzr_core(self, args, retcode, encoding, stdin,
1631
1802
            working_dir):
 
1803
        # Clear chk_map page cache, because the contents are likely to mask
 
1804
        # locking errors.
 
1805
        chk_map.clear_cache()
1632
1806
        if encoding is None:
1633
1807
            encoding = osutils.get_user_encoding()
1634
1808
        stdout = StringIOWrapper()
1670
1844
        if retcode is not None:
1671
1845
            self.assertEquals(retcode, result,
1672
1846
                              message='Unexpected return code')
1673
 
        return out, err
 
1847
        return result, out, err
1674
1848
 
1675
1849
    def run_bzr(self, args, retcode=0, encoding=None, stdin=None,
1676
1850
                working_dir=None, error_regexes=[], output_encoding=None):
1705
1879
        :keyword error_regexes: A list of expected error messages.  If
1706
1880
            specified they must be seen in the error output of the command.
1707
1881
        """
1708
 
        out, err = self._run_bzr_autosplit(
 
1882
        retcode, out, err = self._run_bzr_autosplit(
1709
1883
            args=args,
1710
1884
            retcode=retcode,
1711
1885
            encoding=encoding,
1862
2036
        """
1863
2037
        return Popen(*args, **kwargs)
1864
2038
 
 
2039
    def get_source_path(self):
 
2040
        """Return the path of the directory containing bzrlib."""
 
2041
        return os.path.dirname(os.path.dirname(bzrlib.__file__))
 
2042
 
1865
2043
    def get_bzr_path(self):
1866
2044
        """Return the path of the 'bzr' executable for this test suite."""
1867
 
        bzr_path = os.path.dirname(os.path.dirname(bzrlib.__file__))+'/bzr'
 
2045
        bzr_path = self.get_source_path()+'/bzr'
1868
2046
        if not os.path.isfile(bzr_path):
1869
2047
            # We are probably installed. Assume sys.argv is the right file
1870
2048
            bzr_path = sys.argv[0]
2196
2374
        propagating. This method ensures than a test did not leaked.
2197
2375
        """
2198
2376
        root = TestCaseWithMemoryTransport.TEST_ROOT
 
2377
        self.permit_url(get_transport(root).base)
2199
2378
        wt = workingtree.WorkingTree.open(root)
2200
2379
        last_rev = wt.last_revision()
2201
2380
        if last_rev != 'null:':
2209
2388
 
2210
2389
    def _make_test_root(self):
2211
2390
        if TestCaseWithMemoryTransport.TEST_ROOT is None:
2212
 
            root = osutils.mkdtemp(prefix='testbzr-', suffix='.tmp')
 
2391
            # Watch out for tricky test dir (on OSX /tmp -> /private/tmp)
 
2392
            root = osutils.realpath(osutils.mkdtemp(prefix='testbzr-',
 
2393
                                                    suffix='.tmp'))
2213
2394
            TestCaseWithMemoryTransport.TEST_ROOT = root
2214
2395
 
2215
2396
            self._create_safety_net()
2218
2399
            # specifically told when all tests are finished.  This will do.
2219
2400
            atexit.register(_rmtree_temp_dir, root)
2220
2401
 
 
2402
        self.permit_dir(TestCaseWithMemoryTransport.TEST_ROOT)
2221
2403
        self.addCleanup(self._check_safety_net)
2222
2404
 
2223
2405
    def makeAndChdirToTestDir(self):
2231
2413
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2232
2414
        self.test_dir = TestCaseWithMemoryTransport.TEST_ROOT
2233
2415
        self.test_home_dir = self.test_dir + "/MemoryTransportMissingHomeDir"
 
2416
        self.permit_dir(self.test_dir)
2234
2417
 
2235
2418
    def make_branch(self, relpath, format=None):
2236
2419
        """Create a branch on the transport at relpath."""
2368
2551
            if os.path.exists(name):
2369
2552
                name = name_prefix + '_' + str(i)
2370
2553
            else:
2371
 
                os.mkdir(name)
 
2554
                # now create test and home directories within this dir
 
2555
                self.test_base_dir = name
 
2556
                self.addCleanup(self.deleteTestDir)
 
2557
                os.mkdir(self.test_base_dir)
2372
2558
                break
2373
 
        # now create test and home directories within this dir
2374
 
        self.test_base_dir = name
 
2559
        self.permit_dir(self.test_base_dir)
 
2560
        # 'sprouting' and 'init' of a branch both walk up the tree to find
 
2561
        # stacking policy to honour; create a bzr dir with an unshared
 
2562
        # repository (but not a branch - our code would be trying to escape
 
2563
        # then!) to stop them, and permit it to be read.
 
2564
        # control = bzrdir.BzrDir.create(self.test_base_dir)
 
2565
        # control.create_repository()
2375
2566
        self.test_home_dir = self.test_base_dir + '/home'
2376
2567
        os.mkdir(self.test_home_dir)
2377
2568
        self.test_dir = self.test_base_dir + '/work'
2383
2574
            f.write(self.id())
2384
2575
        finally:
2385
2576
            f.close()
2386
 
        self.addCleanup(self.deleteTestDir)
2387
2577
 
2388
2578
    def deleteTestDir(self):
2389
2579
        os.chdir(TestCaseWithMemoryTransport.TEST_ROOT)
2554
2744
        super(TestCaseWithTransport, self).setUp()
2555
2745
        self.__vfs_server = None
2556
2746
 
 
2747
    def disable_missing_extensions_warning(self):
 
2748
        """Some tests expect a precise stderr content.
 
2749
 
 
2750
        There is no point in forcing them to duplicate the extension related
 
2751
        warning.
 
2752
        """
 
2753
        config.GlobalConfig().set_user_option('ignore_missing_extensions', True)
 
2754
 
2557
2755
 
2558
2756
class ChrootedTestCase(TestCaseWithTransport):
2559
2757
    """A support class that provides readonly urls outside the local namespace.
3052
3250
    concurrency = osutils.local_concurrency()
3053
3251
    result = []
3054
3252
    from subunit import TestProtocolClient, ProtocolTestCase
3055
 
    try:
3056
 
        from subunit.test_results import AutoTimingTestResultDecorator
3057
 
    except ImportError:
3058
 
        AutoTimingTestResultDecorator = lambda x:x
3059
3253
    class TestInOtherProcess(ProtocolTestCase):
3060
3254
        # Should be in subunit, I think. RBC.
3061
3255
        def __init__(self, stream, pid):
3084
3278
                sys.stdin.close()
3085
3279
                sys.stdin = None
3086
3280
                stream = os.fdopen(c2pwrite, 'wb', 1)
3087
 
                subunit_result = AutoTimingTestResultDecorator(
 
3281
                subunit_result = BzrAutoTimingTestResultDecorator(
3088
3282
                    TestProtocolClient(stream))
3089
3283
                process_suite.run(subunit_result)
3090
3284
            finally:
3192
3386
    def addFailure(self, test, err):
3193
3387
        known = self._error_looks_like('KnownFailure: ', err)
3194
3388
        if known is not None:
3195
 
            self.result._addKnownFailure(test, [KnownFailure,
3196
 
                                                KnownFailure(known), None])
 
3389
            self.result.addExpectedFailure(test,
 
3390
                [KnownFailure, KnownFailure(known), None])
3197
3391
        else:
3198
3392
            self.result.addFailure(test, err)
3199
3393
 
3214
3408
        return value
3215
3409
 
3216
3410
 
 
3411
try:
 
3412
    from subunit.test_results import AutoTimingTestResultDecorator
 
3413
    # Expected failure should be seen as a success not a failure Once subunit
 
3414
    # provide native support for that, BZRTransformingResult and this class
 
3415
    # will become useless.
 
3416
    class BzrAutoTimingTestResultDecorator(AutoTimingTestResultDecorator):
 
3417
 
 
3418
        def addExpectedFailure(self, test, err):
 
3419
            self._before_event()
 
3420
            return self._call_maybe("addExpectedFailure", self._degrade_skip,
 
3421
                                    test, err)
 
3422
except ImportError:
 
3423
    # Let's just define a no-op decorator
 
3424
    BzrAutoTimingTestResultDecorator = lambda x:x
 
3425
 
 
3426
 
3217
3427
class ProfileResult(ForwardingResult):
3218
3428
    """Generate profiling data for all activity between start and success.
3219
3429
    
3494
3704
        'bzrlib.tests.commands',
3495
3705
        'bzrlib.tests.per_branch',
3496
3706
        'bzrlib.tests.per_bzrdir',
 
3707
        'bzrlib.tests.per_foreign_vcs',
3497
3708
        'bzrlib.tests.per_interrepository',
3498
3709
        'bzrlib.tests.per_intertree',
3499
3710
        'bzrlib.tests.per_inventory',
3505
3716
        'bzrlib.tests.per_repository',
3506
3717
        'bzrlib.tests.per_repository_chk',
3507
3718
        'bzrlib.tests.per_repository_reference',
 
3719
        'bzrlib.tests.per_uifactory',
3508
3720
        'bzrlib.tests.per_versionedfile',
3509
3721
        'bzrlib.tests.per_workingtree',
3510
3722
        'bzrlib.tests.test__annotator',
3513
3725
        'bzrlib.tests.test__groupcompress',
3514
3726
        'bzrlib.tests.test__known_graph',
3515
3727
        'bzrlib.tests.test__rio',
 
3728
        'bzrlib.tests.test__simple_set',
 
3729
        'bzrlib.tests.test__static_tuple',
3516
3730
        'bzrlib.tests.test__walkdirs_win32',
3517
3731
        'bzrlib.tests.test_ancestry',
3518
3732
        'bzrlib.tests.test_annotate',
3533
3747
        'bzrlib.tests.test_chk_serializer',
3534
3748
        'bzrlib.tests.test_chunk_writer',
3535
3749
        'bzrlib.tests.test_clean_tree',
 
3750
        'bzrlib.tests.test_cleanup',
3536
3751
        'bzrlib.tests.test_commands',
3537
3752
        'bzrlib.tests.test_commit',
3538
3753
        'bzrlib.tests.test_commit_merge',
3618
3833
        'bzrlib.tests.test_rio',
3619
3834
        'bzrlib.tests.test_rules',
3620
3835
        'bzrlib.tests.test_sampler',
 
3836
        'bzrlib.tests.test_script',
3621
3837
        'bzrlib.tests.test_selftest',
3622
3838
        'bzrlib.tests.test_serializer',
3623
3839
        'bzrlib.tests.test_setup',
4046
4262
HTTPSServerFeature = _HTTPSServerFeature()
4047
4263
 
4048
4264
 
 
4265
class _ParamikoFeature(Feature):
 
4266
    """Is paramiko available?"""
 
4267
 
 
4268
    def _probe(self):
 
4269
        try:
 
4270
            from bzrlib.transport.sftp import SFTPAbsoluteServer
 
4271
            return True
 
4272
        except errors.ParamikoNotPresent:
 
4273
            return False
 
4274
 
 
4275
    def feature_name(self):
 
4276
        return "Paramiko"
 
4277
 
 
4278
 
 
4279
ParamikoFeature = _ParamikoFeature()
 
4280
 
 
4281
 
4049
4282
class _UnicodeFilename(Feature):
4050
4283
    """Does the filesystem support Unicode filenames?"""
4051
4284
 
4077
4310
UTF8Filesystem = _UTF8Filesystem()
4078
4311
 
4079
4312
 
 
4313
class _BreakinFeature(Feature):
 
4314
    """Does this platform support the breakin feature?"""
 
4315
 
 
4316
    def _probe(self):
 
4317
        from bzrlib import breakin
 
4318
        if breakin.determine_signal() is None:
 
4319
            return False
 
4320
        if sys.platform == 'win32':
 
4321
            # Windows doesn't have os.kill, and we catch the SIGBREAK signal.
 
4322
            # We trigger SIGBREAK via a Console api so we need ctypes to
 
4323
            # access the function
 
4324
            try:
 
4325
                import ctypes
 
4326
            except OSError:
 
4327
                return False
 
4328
        return True
 
4329
 
 
4330
    def feature_name(self):
 
4331
        return "SIGQUIT or SIGBREAK w/ctypes on win32"
 
4332
 
 
4333
 
 
4334
BreakinFeature = _BreakinFeature()
 
4335
 
 
4336
 
4080
4337
class _CaseInsCasePresFilenameFeature(Feature):
4081
4338
    """Is the file-system case insensitive, but case-preserving?"""
4082
4339
 
4149
4406
# Only define SubUnitBzrRunner if subunit is available.
4150
4407
try:
4151
4408
    from subunit import TestProtocolClient
4152
 
    try:
4153
 
        from subunit.test_results import AutoTimingTestResultDecorator
4154
 
    except ImportError:
4155
 
        AutoTimingTestResultDecorator = lambda x:x
4156
4409
    class SubUnitBzrRunner(TextTestRunner):
4157
4410
        def run(self, test):
4158
 
            result = AutoTimingTestResultDecorator(
 
4411
            result = BzrAutoTimingTestResultDecorator(
4159
4412
                TestProtocolClient(self.stream))
4160
4413
            test.run(result)
4161
4414
            return result