~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-02-11 04:02:41 UTC
  • mfrom: (5017.2.2 tariff)
  • Revision ID: pqm@pqm.ubuntu.com-20100211040241-w6n021dz0uus341n
(mbp) add import-tariff tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
102
101
    deprecated_passed,
103
102
    )
104
103
import bzrlib.trace
105
 
from bzrlib.transport import (
106
 
    get_transport,
107
 
    memory,
108
 
    pathfilter,
109
 
    )
 
104
from bzrlib.transport import get_transport, pathfilter
110
105
import bzrlib.transport
 
106
from bzrlib.transport.local import LocalURLServer
 
107
from bzrlib.transport.memory import MemoryServer
 
108
from bzrlib.transport.readonly import ReadonlyServer
111
109
from bzrlib.trace import mutter, note
112
 
from bzrlib.tests import (
113
 
    test_server,
114
 
    TestUtil,
115
 
    )
 
110
from bzrlib.tests import TestUtil
116
111
from bzrlib.tests.http_server import HttpServer
117
112
from bzrlib.tests.TestUtil import (
118
113
                          TestSuite,
129
124
# shown frame is the test code, not our assertXYZ.
130
125
__unittest = 1
131
126
 
132
 
default_transport = test_server.LocalURLServer
 
127
default_transport = LocalURLServer
133
128
 
134
129
 
135
130
_unitialized_attr = object()
490
485
        return self._shortened_test_description(test)
491
486
 
492
487
    def report_error(self, test, err):
493
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
488
        self.ui.note('ERROR: %s\n    %s\n' % (
494
489
            self._test_description(test),
495
490
            err[1],
496
491
            ))
497
492
 
498
493
    def report_failure(self, test, err):
499
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
494
        self.ui.note('FAIL: %s\n    %s\n' % (
500
495
            self._test_description(test),
501
496
            err[1],
502
497
            ))
1045
1040
        if t.base.endswith('/work/'):
1046
1041
            # we have safety net/test root/work
1047
1042
            t = t.clone('../..')
1048
 
        elif isinstance(transport_server,
1049
 
                        test_server.SmartTCPServer_for_testing):
 
1043
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
1050
1044
            # The smart server adds a path similar to work, which is traversed
1051
1045
            # up from by the client. But the server is chrooted - the actual
1052
1046
            # backing transport is not escaped from, and VFS requests to the
1313
1307
            f.close()
1314
1308
        self.assertEqualDiff(content, s)
1315
1309
 
1316
 
    def assertDocstring(self, expected_docstring, obj):
1317
 
        """Fail if obj does not have expected_docstring"""
1318
 
        if __doc__ is None:
1319
 
            # With -OO the docstring should be None instead
1320
 
            self.assertIs(obj.__doc__, None)
1321
 
        else:
1322
 
            self.assertEqual(expected_docstring, obj.__doc__)
1323
 
 
1324
1310
    def failUnlessExists(self, path):
1325
1311
        """Fail unless path or paths, which may be abs or relative, exist."""
1326
1312
        if not isinstance(path, basestring):
1528
1514
            'BZR_PROGRESS_BAR': None,
1529
1515
            'BZR_LOG': None,
1530
1516
            'BZR_PLUGIN_PATH': None,
1531
 
            'BZR_DISABLE_PLUGINS': None,
1532
 
            'BZR_PLUGINS_AT': None,
1533
1517
            'BZR_CONCURRENCY': None,
1534
1518
            # Make sure that any text ui tests are consistent regardless of
1535
1519
            # the environment the test case is run in; you may want tests that
1682
1666
                unicodestr = log_contents.decode('utf8', 'replace')
1683
1667
                log_contents = unicodestr.encode('utf8')
1684
1668
            if not keep_log_file:
1685
 
                close_attempts = 0
1686
 
                max_close_attempts = 100
1687
 
                first_close_error = None
1688
 
                while close_attempts < max_close_attempts:
1689
 
                    close_attempts += 1
1690
 
                    try:
1691
 
                        self._log_file.close()
1692
 
                    except IOError, ioe:
1693
 
                        if ioe.errno is None:
1694
 
                            # No errno implies 'close() called during
1695
 
                            # concurrent operation on the same file object', so
1696
 
                            # retry.  Probably a thread is trying to write to
1697
 
                            # the log file.
1698
 
                            if first_close_error is None:
1699
 
                                first_close_error = ioe
1700
 
                            continue
1701
 
                        raise
1702
 
                    else:
1703
 
                        break
1704
 
                if close_attempts > 1:
1705
 
                    sys.stderr.write(
1706
 
                        'Unable to close log file on first attempt, '
1707
 
                        'will retry: %s\n' % (first_close_error,))
1708
 
                    if close_attempts == max_close_attempts:
1709
 
                        sys.stderr.write(
1710
 
                            'Unable to close log file after %d attempts.\n'
1711
 
                            % (max_close_attempts,))
 
1669
                self._log_file.close()
1712
1670
                self._log_file = None
1713
1671
                # Permit multiple calls to get_log until we clean it up in
1714
1672
                # finishLogFile
2217
2175
        if self.__readonly_server is None:
2218
2176
            if self.transport_readonly_server is None:
2219
2177
                # readonly decorator requested
2220
 
                self.__readonly_server = test_server.ReadonlyServer()
 
2178
                self.__readonly_server = ReadonlyServer()
2221
2179
            else:
2222
2180
                # explicit readonly transport.
2223
2181
                self.__readonly_server = self.create_transport_readonly_server()
2246
2204
        is no means to override it.
2247
2205
        """
2248
2206
        if self.__vfs_server is None:
2249
 
            self.__vfs_server = memory.MemoryServer()
 
2207
            self.__vfs_server = MemoryServer()
2250
2208
            self.start_server(self.__vfs_server)
2251
2209
        return self.__vfs_server
2252
2210
 
2409
2367
        return made_control.create_repository(shared=shared)
2410
2368
 
2411
2369
    def make_smart_server(self, path):
2412
 
        smart_server = test_server.SmartTCPServer_for_testing()
 
2370
        smart_server = server.SmartTCPServer_for_testing()
2413
2371
        self.start_server(smart_server, self.get_server())
2414
2372
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2415
2373
        return remote_transport
2442
2400
 
2443
2401
    def setup_smart_server_with_call_log(self):
2444
2402
        """Sets up a smart server as the transport server with a call log."""
2445
 
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2403
        self.transport_server = server.SmartTCPServer_for_testing
2446
2404
        self.hpss_calls = []
2447
2405
        import traceback
2448
2406
        # Skip the current stack down to the caller of
2661
2619
            # We can only make working trees locally at the moment.  If the
2662
2620
            # transport can't support them, then we keep the non-disk-backed
2663
2621
            # branch and create a local checkout.
2664
 
            if self.vfs_transport_factory is test_server.LocalURLServer:
 
2622
            if self.vfs_transport_factory is LocalURLServer:
2665
2623
                # the branch is colocated on disk, we cannot create a checkout.
2666
2624
                # hopefully callers will expect this.
2667
2625
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2727
2685
 
2728
2686
    def setUp(self):
2729
2687
        super(ChrootedTestCase, self).setUp()
2730
 
        if not self.vfs_transport_factory == memory.MemoryServer:
 
2688
        if not self.vfs_transport_factory == MemoryServer:
2731
2689
            self.transport_readonly_server = HttpServer
2732
2690
 
2733
2691
 
3201
3159
    return result
3202
3160
 
3203
3161
 
3204
 
def workaround_zealous_crypto_random():
3205
 
    """Crypto.Random want to help us being secure, but we don't care here.
3206
 
 
3207
 
    This workaround some test failure related to the sftp server. Once paramiko
3208
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3209
 
    """
3210
 
    try:
3211
 
        from Crypto.Random import atfork
3212
 
        atfork()
3213
 
    except ImportError:
3214
 
        pass
3215
 
 
3216
 
 
3217
3162
def fork_for_tests(suite):
3218
3163
    """Take suite and start up one runner per CPU by forking()
3219
3164
 
3234
3179
            try:
3235
3180
                ProtocolTestCase.run(self, result)
3236
3181
            finally:
3237
 
                os.waitpid(self.pid, 0)
 
3182
                os.waitpid(self.pid, os.WNOHANG)
3238
3183
 
3239
3184
    test_blocks = partition_tests(suite, concurrency)
3240
3185
    for process_tests in test_blocks:
3243
3188
        c2pread, c2pwrite = os.pipe()
3244
3189
        pid = os.fork()
3245
3190
        if pid == 0:
3246
 
            workaround_zealous_crypto_random()
3247
3191
            try:
3248
3192
                os.close(c2pread)
3249
3193
                # Leave stderr and stdout open so we can see test noise
3634
3578
        'bzrlib.tests.commands',
3635
3579
        'bzrlib.tests.per_branch',
3636
3580
        'bzrlib.tests.per_bzrdir',
3637
 
        'bzrlib.tests.per_bzrdir_colo',
3638
3581
        'bzrlib.tests.per_foreign_vcs',
3639
3582
        'bzrlib.tests.per_interrepository',
3640
3583
        'bzrlib.tests.per_intertree',
3680
3623
        'bzrlib.tests.test_chunk_writer',
3681
3624
        'bzrlib.tests.test_clean_tree',
3682
3625
        'bzrlib.tests.test_cleanup',
3683
 
        'bzrlib.tests.test_cmdline',
3684
3626
        'bzrlib.tests.test_commands',
3685
3627
        'bzrlib.tests.test_commit',
3686
3628
        'bzrlib.tests.test_commit_merge',
3734
3676
        'bzrlib.tests.test_lru_cache',
3735
3677
        'bzrlib.tests.test_lsprof',
3736
3678
        'bzrlib.tests.test_mail_client',
3737
 
        'bzrlib.tests.test_matchers',
3738
3679
        'bzrlib.tests.test_memorytree',
3739
3680
        'bzrlib.tests.test_merge',
3740
3681
        'bzrlib.tests.test_merge3',
3820
3761
 
3821
3762
 
3822
3763
def _test_suite_modules_to_doctest():
3823
 
    """Return the list of modules to doctest."""
3824
 
    if __doc__ is None:
3825
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3826
 
        return []
 
3764
    """Return the list of modules to doctest."""   
3827
3765
    return [
3828
3766
        'bzrlib',
3829
3767
        'bzrlib.branchbuilder',
4429
4367
CaseInsensitiveFilesystemFeature = _CaseInsensitiveFilesystemFeature()
4430
4368
 
4431
4369
 
4432
 
class _CaseSensitiveFilesystemFeature(Feature):
4433
 
 
4434
 
    def _probe(self):
4435
 
        if CaseInsCasePresFilenameFeature.available():
4436
 
            return False
4437
 
        elif CaseInsensitiveFilesystemFeature.available():
4438
 
            return False
4439
 
        else:
4440
 
            return True
4441
 
 
4442
 
    def feature_name(self):
4443
 
        return 'case-sensitive filesystem'
4444
 
 
4445
 
# new coding style is for feature instances to be lowercase
4446
 
case_sensitive_filesystem_feature = _CaseSensitiveFilesystemFeature()
4447
 
 
4448
 
 
4449
4370
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4450
4371
SubUnitFeature = _CompatabilityThunkFeature(
4451
4372
    deprecated_in((2,1,0)),
4462
4383
            return result
4463
4384
except ImportError:
4464
4385
    pass
4465
 
 
4466
 
class _PosixPermissionsFeature(Feature):
4467
 
 
4468
 
    def _probe(self):
4469
 
        def has_perms():
4470
 
            # create temporary file and check if specified perms are maintained.
4471
 
            import tempfile
4472
 
 
4473
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4474
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4475
 
            fd, name = f
4476
 
            os.close(fd)
4477
 
            os.chmod(name, write_perms)
4478
 
 
4479
 
            read_perms = os.stat(name).st_mode & 0777
4480
 
            os.unlink(name)
4481
 
            return (write_perms == read_perms)
4482
 
 
4483
 
        return (os.name == 'posix') and has_perms()
4484
 
 
4485
 
    def feature_name(self):
4486
 
        return 'POSIX permissions support'
4487
 
 
4488
 
posix_permissions_feature = _PosixPermissionsFeature()