~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Robert Collins
  • Date: 2010-04-08 04:34:03 UTC
  • mfrom: (5138 +trunk)
  • mto: This revision was merged to the branch mainline in revision 5139.
  • Revision ID: robertc@robertcollins.net-20100408043403-56z0d07vdqrx7f3t
Update bugfix for 528114 to trunk.

Show diffs side-by-side

added added

removed removed

Lines of Context:
101
101
    deprecated_passed,
102
102
    )
103
103
import bzrlib.trace
104
 
from bzrlib.transport import get_transport, pathfilter
 
104
from bzrlib.transport import (
 
105
    get_transport,
 
106
    memory,
 
107
    pathfilter,
 
108
    )
105
109
import bzrlib.transport
106
 
from bzrlib.transport.local import LocalURLServer
107
 
from bzrlib.transport.memory import MemoryServer
108
 
from bzrlib.transport.readonly import ReadonlyServer
109
110
from bzrlib.trace import mutter, note
110
 
from bzrlib.tests import TestUtil
 
111
from bzrlib.tests import (
 
112
    test_server,
 
113
    TestUtil,
 
114
    )
111
115
from bzrlib.tests.http_server import HttpServer
112
116
from bzrlib.tests.TestUtil import (
113
117
                          TestSuite,
124
128
# shown frame is the test code, not our assertXYZ.
125
129
__unittest = 1
126
130
 
127
 
default_transport = LocalURLServer
 
131
default_transport = test_server.LocalURLServer
 
132
 
 
133
 
 
134
_unitialized_attr = object()
 
135
"""A sentinel needed to act as a default value in a method signature."""
 
136
 
128
137
 
129
138
# Subunit result codes, defined here to prevent a hard dependency on subunit.
130
139
SUBUNIT_SEEK_SET = 0
480
489
        return self._shortened_test_description(test)
481
490
 
482
491
    def report_error(self, test, err):
483
 
        ui.ui_factory.note('ERROR: %s\n    %s\n' % (
 
492
        self.ui.note('ERROR: %s\n    %s\n' % (
484
493
            self._test_description(test),
485
494
            err[1],
486
495
            ))
487
496
 
488
497
    def report_failure(self, test, err):
489
 
        ui.ui_factory.note('FAIL: %s\n    %s\n' % (
 
498
        self.ui.note('FAIL: %s\n    %s\n' % (
490
499
            self._test_description(test),
491
500
            err[1],
492
501
            ))
697
706
    """
698
707
 
699
708
 
700
 
class CommandFailed(Exception):
701
 
    pass
702
 
 
703
 
 
704
709
class StringIOWrapper(object):
705
710
    """A wrapper around cStringIO which just adds an encoding attribute.
706
711
 
850
855
        Tests that want to use debug flags can just set them in the
851
856
        debug_flags set during setup/teardown.
852
857
        """
853
 
        self._preserved_debug_flags = set(debug.debug_flags)
 
858
        # Start with a copy of the current debug flags we can safely modify.
 
859
        self.overrideAttr(debug, 'debug_flags', set(debug.debug_flags))
854
860
        if 'allow_debug' not in selftest_debug_flags:
855
861
            debug.debug_flags.clear()
856
862
        if 'disable_lock_checks' not in selftest_debug_flags:
857
863
            debug.debug_flags.add('strict_locks')
858
 
        self.addCleanup(self._restore_debug_flags)
859
864
 
860
865
    def _clear_hooks(self):
861
866
        # prevent hooks affecting tests
882
887
    def _silenceUI(self):
883
888
        """Turn off UI for duration of test"""
884
889
        # by default the UI is off; tests can turn it on if they want it.
885
 
        saved = ui.ui_factory
886
 
        def _restore():
887
 
            ui.ui_factory = saved
888
 
        ui.ui_factory = ui.SilentUIFactory()
889
 
        self.addCleanup(_restore)
 
890
        self.overrideAttr(ui, 'ui_factory', ui.SilentUIFactory())
890
891
 
891
892
    def _check_locks(self):
892
893
        """Check that all lock take/release actions have been paired."""
1043
1044
        if t.base.endswith('/work/'):
1044
1045
            # we have safety net/test root/work
1045
1046
            t = t.clone('../..')
1046
 
        elif isinstance(transport_server, server.SmartTCPServer_for_testing):
 
1047
        elif isinstance(transport_server,
 
1048
                        test_server.SmartTCPServer_for_testing):
1047
1049
            # The smart server adds a path similar to work, which is traversed
1048
1050
            # up from by the client. But the server is chrooted - the actual
1049
1051
            # backing transport is not escaped from, and VFS requests to the
1204
1206
            raise AssertionError('pattern "%s" found in "%s"'
1205
1207
                    % (needle_re, haystack))
1206
1208
 
 
1209
    def assertContainsString(self, haystack, needle):
 
1210
        if haystack.find(needle) == -1:
 
1211
            self.fail("string %r not found in '''%s'''" % (needle, haystack))
 
1212
 
1207
1213
    def assertSubset(self, sublist, superlist):
1208
1214
        """Assert that every entry in sublist is present in superlist."""
1209
1215
        missing = set(sublist) - set(superlist)
1479
1485
        """
1480
1486
        self._cleanups.append((callable, args, kwargs))
1481
1487
 
 
1488
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
 
1489
        """Overrides an object attribute restoring it after the test.
 
1490
 
 
1491
        :param obj: The object that will be mutated.
 
1492
 
 
1493
        :param attr_name: The attribute name we want to preserve/override in
 
1494
            the object.
 
1495
 
 
1496
        :param new: The optional value we want to set the attribute to.
 
1497
 
 
1498
        :returns: The actual attr value.
 
1499
        """
 
1500
        value = getattr(obj, attr_name)
 
1501
        # The actual value is captured by the call below
 
1502
        self.addCleanup(setattr, obj, attr_name, value)
 
1503
        if new is not _unitialized_attr:
 
1504
            setattr(obj, attr_name, new)
 
1505
        return value
 
1506
 
1482
1507
    def _cleanEnvironment(self):
1483
1508
        new_env = {
1484
1509
            'BZR_HOME': None, # Don't inherit BZR_HOME to all the tests.
1494
1519
            'BZR_PROGRESS_BAR': None,
1495
1520
            'BZR_LOG': None,
1496
1521
            'BZR_PLUGIN_PATH': None,
 
1522
            'BZR_DISABLE_PLUGINS': None,
 
1523
            'BZR_PLUGINS_AT': None,
1497
1524
            'BZR_CONCURRENCY': None,
1498
1525
            # Make sure that any text ui tests are consistent regardless of
1499
1526
            # the environment the test case is run in; you may want tests that
1520
1547
            'ftp_proxy': None,
1521
1548
            'FTP_PROXY': None,
1522
1549
            'BZR_REMOTE_PATH': None,
 
1550
            # Generally speaking, we don't want apport reporting on crashes in
 
1551
            # the test envirnoment unless we're specifically testing apport,
 
1552
            # so that it doesn't leak into the real system environment.  We
 
1553
            # use an env var so it propagates to subprocesses.
 
1554
            'APPORT_DISABLE': '1',
1523
1555
        }
1524
 
        self.__old_env = {}
 
1556
        self._old_env = {}
1525
1557
        self.addCleanup(self._restoreEnvironment)
1526
1558
        for name, value in new_env.iteritems():
1527
1559
            self._captureVar(name, value)
1528
1560
 
1529
1561
    def _captureVar(self, name, newvalue):
1530
1562
        """Set an environment variable, and reset it when finished."""
1531
 
        self.__old_env[name] = osutils.set_or_unset_env(name, newvalue)
1532
 
 
1533
 
    def _restore_debug_flags(self):
1534
 
        debug.debug_flags.clear()
1535
 
        debug.debug_flags.update(self._preserved_debug_flags)
 
1563
        self._old_env[name] = osutils.set_or_unset_env(name, newvalue)
1536
1564
 
1537
1565
    def _restoreEnvironment(self):
1538
 
        for name, value in self.__old_env.iteritems():
 
1566
        for name, value in self._old_env.iteritems():
1539
1567
            osutils.set_or_unset_env(name, value)
1540
1568
 
1541
1569
    def _restoreHooks(self):
1645
1673
                unicodestr = log_contents.decode('utf8', 'replace')
1646
1674
                log_contents = unicodestr.encode('utf8')
1647
1675
            if not keep_log_file:
1648
 
                self._log_file.close()
 
1676
                close_attempts = 0
 
1677
                max_close_attempts = 100
 
1678
                first_close_error = None
 
1679
                while close_attempts < max_close_attempts:
 
1680
                    close_attempts += 1
 
1681
                    try:
 
1682
                        self._log_file.close()
 
1683
                    except IOError, ioe:
 
1684
                        if ioe.errno is None:
 
1685
                            # No errno implies 'close() called during
 
1686
                            # concurrent operation on the same file object', so
 
1687
                            # retry.  Probably a thread is trying to write to
 
1688
                            # the log file.
 
1689
                            if first_close_error is None:
 
1690
                                first_close_error = ioe
 
1691
                            continue
 
1692
                        raise
 
1693
                    else:
 
1694
                        break
 
1695
                if close_attempts > 1:
 
1696
                    sys.stderr.write(
 
1697
                        'Unable to close log file on first attempt, '
 
1698
                        'will retry: %s\n' % (first_close_error,))
 
1699
                    if close_attempts == max_close_attempts:
 
1700
                        sys.stderr.write(
 
1701
                            'Unable to close log file after %d attempts.\n'
 
1702
                            % (max_close_attempts,))
1649
1703
                self._log_file = None
1650
1704
                # Permit multiple calls to get_log until we clean it up in
1651
1705
                # finishLogFile
2037
2091
 
2038
2092
        Tests that expect to provoke LockContention errors should call this.
2039
2093
        """
2040
 
        orig_timeout = bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS
2041
 
        def resetTimeout():
2042
 
            bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = orig_timeout
2043
 
        self.addCleanup(resetTimeout)
2044
 
        bzrlib.lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
2094
        self.overrideAttr(bzrlib.lockdir, '_DEFAULT_TIMEOUT_SECONDS', 0)
2045
2095
 
2046
2096
    def make_utf8_encoded_stringio(self, encoding_type=None):
2047
2097
        """Return a StringIOWrapper instance, that will encode Unicode
2061
2111
        request_handlers = request.request_handlers
2062
2112
        orig_method = request_handlers.get(verb)
2063
2113
        request_handlers.remove(verb)
2064
 
        def restoreVerb():
2065
 
            request_handlers.register(verb, orig_method)
2066
 
        self.addCleanup(restoreVerb)
 
2114
        self.addCleanup(request_handlers.register, verb, orig_method)
2067
2115
 
2068
2116
 
2069
2117
class CapturedCall(object):
2160
2208
        if self.__readonly_server is None:
2161
2209
            if self.transport_readonly_server is None:
2162
2210
                # readonly decorator requested
2163
 
                self.__readonly_server = ReadonlyServer()
 
2211
                self.__readonly_server = test_server.ReadonlyServer()
2164
2212
            else:
2165
2213
                # explicit readonly transport.
2166
2214
                self.__readonly_server = self.create_transport_readonly_server()
2189
2237
        is no means to override it.
2190
2238
        """
2191
2239
        if self.__vfs_server is None:
2192
 
            self.__vfs_server = MemoryServer()
 
2240
            self.__vfs_server = memory.MemoryServer()
2193
2241
            self.start_server(self.__vfs_server)
2194
2242
        return self.__vfs_server
2195
2243
 
2352
2400
        return made_control.create_repository(shared=shared)
2353
2401
 
2354
2402
    def make_smart_server(self, path):
2355
 
        smart_server = server.SmartTCPServer_for_testing()
 
2403
        smart_server = test_server.SmartTCPServer_for_testing()
2356
2404
        self.start_server(smart_server, self.get_server())
2357
2405
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2358
2406
        return remote_transport
2376
2424
    def setUp(self):
2377
2425
        super(TestCaseWithMemoryTransport, self).setUp()
2378
2426
        self._make_test_root()
2379
 
        _currentdir = os.getcwdu()
2380
 
        def _leaveDirectory():
2381
 
            os.chdir(_currentdir)
2382
 
        self.addCleanup(_leaveDirectory)
 
2427
        self.addCleanup(os.chdir, os.getcwdu())
2383
2428
        self.makeAndChdirToTestDir()
2384
2429
        self.overrideEnvironmentForTesting()
2385
2430
        self.__readonly_server = None
2388
2433
 
2389
2434
    def setup_smart_server_with_call_log(self):
2390
2435
        """Sets up a smart server as the transport server with a call log."""
2391
 
        self.transport_server = server.SmartTCPServer_for_testing
 
2436
        self.transport_server = test_server.SmartTCPServer_for_testing
2392
2437
        self.hpss_calls = []
2393
2438
        import traceback
2394
2439
        # Skip the current stack down to the caller of
2607
2652
            # We can only make working trees locally at the moment.  If the
2608
2653
            # transport can't support them, then we keep the non-disk-backed
2609
2654
            # branch and create a local checkout.
2610
 
            if self.vfs_transport_factory is LocalURLServer:
 
2655
            if self.vfs_transport_factory is test_server.LocalURLServer:
2611
2656
                # the branch is colocated on disk, we cannot create a checkout.
2612
2657
                # hopefully callers will expect this.
2613
2658
                local_controldir= bzrdir.BzrDir.open(self.get_vfs_only_url(relpath))
2673
2718
 
2674
2719
    def setUp(self):
2675
2720
        super(ChrootedTestCase, self).setUp()
2676
 
        if not self.vfs_transport_factory == MemoryServer:
 
2721
        if not self.vfs_transport_factory == memory.MemoryServer:
2677
2722
            self.transport_readonly_server = HttpServer
2678
2723
 
2679
2724
 
3550
3595
# appear prefixed ('bzrlib.' is "replaced" by 'bzrlib.').
3551
3596
test_prefix_alias_registry.register('bzrlib', 'bzrlib')
3552
3597
 
3553
 
# Obvious higest levels prefixes, feel free to add your own via a plugin
 
3598
# Obvious highest levels prefixes, feel free to add your own via a plugin
3554
3599
test_prefix_alias_registry.register('bd', 'bzrlib.doc')
3555
3600
test_prefix_alias_registry.register('bu', 'bzrlib.utils')
3556
3601
test_prefix_alias_registry.register('bt', 'bzrlib.tests')
3611
3656
        'bzrlib.tests.test_chunk_writer',
3612
3657
        'bzrlib.tests.test_clean_tree',
3613
3658
        'bzrlib.tests.test_cleanup',
 
3659
        'bzrlib.tests.test_cmdline',
3614
3660
        'bzrlib.tests.test_commands',
3615
3661
        'bzrlib.tests.test_commit',
3616
3662
        'bzrlib.tests.test_commit_merge',
3650
3696
        'bzrlib.tests.test_identitymap',
3651
3697
        'bzrlib.tests.test_ignores',
3652
3698
        'bzrlib.tests.test_index',
 
3699
        'bzrlib.tests.test_import_tariff',
3653
3700
        'bzrlib.tests.test_info',
3654
3701
        'bzrlib.tests.test_inv',
3655
3702
        'bzrlib.tests.test_inventory_delta',
4123
4170
    should really use a different feature.
4124
4171
    """
4125
4172
 
4126
 
    def __init__(self, module, name, this_name, dep_version):
 
4173
    def __init__(self, dep_version, module, name,
 
4174
                 replacement_name, replacement_module=None):
4127
4175
        super(_CompatabilityThunkFeature, self).__init__()
4128
4176
        self._module = module
 
4177
        if replacement_module is None:
 
4178
            replacement_module = module
 
4179
        self._replacement_module = replacement_module
4129
4180
        self._name = name
4130
 
        self._this_name = this_name
 
4181
        self._replacement_name = replacement_name
4131
4182
        self._dep_version = dep_version
4132
4183
        self._feature = None
4133
4184
 
4134
4185
    def _ensure(self):
4135
4186
        if self._feature is None:
4136
 
            msg = (self._dep_version % self._this_name) + (
4137
 
                   ' Use %s.%s instead.' % (self._module, self._name))
4138
 
            symbol_versioning.warn(msg, DeprecationWarning)
4139
 
            mod = __import__(self._module, {}, {}, [self._name])
4140
 
            self._feature = getattr(mod, self._name)
 
4187
            depr_msg = self._dep_version % ('%s.%s'
 
4188
                                            % (self._module, self._name))
 
4189
            use_msg = ' Use %s.%s instead.' % (self._replacement_module,
 
4190
                                               self._replacement_name)
 
4191
            symbol_versioning.warn(depr_msg + use_msg, DeprecationWarning)
 
4192
            # Import the new feature and use it as a replacement for the
 
4193
            # deprecated one.
 
4194
            mod = __import__(self._replacement_module, {}, {},
 
4195
                             [self._replacement_name])
 
4196
            self._feature = getattr(mod, self._replacement_name)
4141
4197
 
4142
4198
    def _probe(self):
4143
4199
        self._ensure()
4169
4225
        if self.available(): # Make sure the probe has been done
4170
4226
            return self._module
4171
4227
        return None
4172
 
    
 
4228
 
4173
4229
    def feature_name(self):
4174
4230
        return self.module_name
4175
4231
 
4176
4232
 
4177
4233
# This is kept here for compatibility, it is recommended to use
4178
4234
# 'bzrlib.tests.feature.paramiko' instead
4179
 
ParamikoFeature = _CompatabilityThunkFeature('bzrlib.tests.features',
4180
 
    'paramiko', 'bzrlib.tests.ParamikoFeature', deprecated_in((2,1,0)))
 
4235
ParamikoFeature = _CompatabilityThunkFeature(
 
4236
    deprecated_in((2,1,0)),
 
4237
    'bzrlib.tests.features', 'ParamikoFeature', 'paramiko')
4181
4238
 
4182
4239
 
4183
4240
def probe_unicode_in_user_encoding():
4362
4419
 
4363
4420
 
4364
4421
# Kept for compatibility, use bzrlib.tests.features.subunit instead
4365
 
SubUnitFeature = _CompatabilityThunkFeature('bzrlib.tests.features', 'subunit',
4366
 
    'bzrlib.tests.SubUnitFeature', deprecated_in((2,1,0)))
 
4422
SubUnitFeature = _CompatabilityThunkFeature(
 
4423
    deprecated_in((2,1,0)),
 
4424
    'bzrlib.tests.features', 'SubUnitFeature', 'subunit')
4367
4425
# Only define SubUnitBzrRunner if subunit is available.
4368
4426
try:
4369
4427
    from subunit import TestProtocolClient