~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_config.py

  • Committer: Jelmer Vernooij
  • Date: 2011-08-04 13:30:30 UTC
  • mfrom: (6050 +trunk)
  • mto: This revision was merged to the branch mainline in revision 6052.
  • Revision ID: jelmer@samba.org-20110804133030-uwo00unp8b0n782c
merge bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
37
37
    ui,
38
38
    urlutils,
39
39
    registry,
 
40
    remote,
40
41
    tests,
41
42
    trace,
42
43
    transport,
43
44
    )
 
45
from bzrlib.symbol_versioning import (
 
46
    deprecated_in,
 
47
    deprecated_method,
 
48
    )
 
49
from bzrlib.transport import remote as transport_remote
44
50
from bzrlib.tests import (
45
51
    features,
46
 
    TestSkipped,
47
52
    scenarios,
 
53
    test_server,
48
54
    )
49
55
from bzrlib.util.configobj import configobj
50
56
 
63
69
 
64
70
load_tests = scenarios.load_tests_apply_scenarios
65
71
 
66
 
# We need adapters that can build a config store in a test context. Test
67
 
# classes, based on TestCaseWithTransport, can use the registry to parametrize
68
 
# themselves. The builder will receive a test instance and should return a
69
 
# ready-to-use store.  Plugins that defines new stores can also register
70
 
# themselves here to be tested against the tests defined below.
71
 
 
72
 
# FIXME: plugins should *not* need to import test_config to register their
73
 
# helpers (or selftest -s xxx will be broken), the following registry should be
74
 
# moved to bzrlib.config instead so that selftest -s bt.test_config also runs
75
 
# the plugin specific tests (selftest -s bp.xxx won't, that would be against
76
 
# the spirit of '-s') -- vila 20110503
77
 
test_store_builder_registry = registry.Registry()
78
 
test_store_builder_registry.register(
 
72
# Register helpers to build stores
 
73
config.test_store_builder_registry.register(
79
74
    'configobj', lambda test: config.IniFileStore(test.get_transport(),
80
75
                                                  'configobj.conf'))
81
 
test_store_builder_registry.register(
 
76
config.test_store_builder_registry.register(
82
77
    'bazaar', lambda test: config.GlobalStore())
83
 
test_store_builder_registry.register(
 
78
config.test_store_builder_registry.register(
84
79
    'location', lambda test: config.LocationStore())
85
 
test_store_builder_registry.register(
86
 
    'branch', lambda test: config.BranchStore(test.branch))
87
 
 
88
 
# FIXME: Same remark as above for the following registry -- vila 20110503
89
 
test_stack_builder_registry = registry.Registry()
90
 
test_stack_builder_registry.register(
 
80
 
 
81
 
 
82
def build_backing_branch(test, relpath,
 
83
                         transport_class=None, server_class=None):
 
84
    """Test helper to create a backing branch only once.
 
85
 
 
86
    Some tests needs multiple stores/stacks to check concurrent update
 
87
    behaviours. As such, they need to build different branch *objects* even if
 
88
    they share the branch on disk.
 
89
 
 
90
    :param relpath: The relative path to the branch. (Note that the helper
 
91
        should always specify the same relpath).
 
92
 
 
93
    :param transport_class: The Transport class the test needs to use.
 
94
 
 
95
    :param server_class: The server associated with the ``transport_class``
 
96
        above.
 
97
 
 
98
    Either both or neither of ``transport_class`` and ``server_class`` should
 
99
    be specified.
 
100
    """
 
101
    if transport_class is not None and server_class is not None:
 
102
        test.transport_class = transport_class
 
103
        test.transport_server = server_class
 
104
    elif not (transport_class is None and server_class is None):
 
105
        raise AssertionError('Specify both ``transport_class`` and '
 
106
                             '``server_class`` or neither of them')
 
107
    if getattr(test, 'backing_branch', None) is None:
 
108
        # First call, let's build the branch on disk
 
109
        test.backing_branch = test.make_branch(relpath)
 
110
 
 
111
 
 
112
def build_branch_store(test):
 
113
    build_backing_branch(test, 'branch')
 
114
    b = branch.Branch.open('branch')
 
115
    return config.BranchStore(b)
 
116
config.test_store_builder_registry.register('branch', build_branch_store)
 
117
 
 
118
 
 
119
def build_remote_branch_store(test):
 
120
    # There is only one permutation (but we won't be able to handle more with
 
121
    # this design anyway)
 
122
    (transport_class,
 
123
     server_class) = transport_remote.get_test_permutations()[0]
 
124
    build_backing_branch(test, 'branch', transport_class, server_class)
 
125
    b = branch.Branch.open(test.get_url('branch'))
 
126
    return config.BranchStore(b)
 
127
config.test_store_builder_registry.register('remote_branch',
 
128
                                            build_remote_branch_store)
 
129
 
 
130
 
 
131
config.test_stack_builder_registry.register(
91
132
    'bazaar', lambda test: config.GlobalStack())
92
 
test_stack_builder_registry.register(
 
133
config.test_stack_builder_registry.register(
93
134
    'location', lambda test: config.LocationStack('.'))
94
 
test_stack_builder_registry.register(
95
 
    'branch', lambda test: config.BranchStack(test.branch))
 
135
 
 
136
 
 
137
def build_branch_stack(test):
 
138
    build_backing_branch(test, 'branch')
 
139
    b = branch.Branch.open('branch')
 
140
    return config.BranchStack(b)
 
141
config.test_stack_builder_registry.register('branch', build_branch_stack)
 
142
 
 
143
 
 
144
def build_remote_branch_stack(test):
 
145
    # There is only one permutation (but we won't be able to handle more with
 
146
    # this design anyway)
 
147
    (transport_class,
 
148
     server_class) = transport_remote.get_test_permutations()[0]
 
149
    build_backing_branch(test, 'branch', transport_class, server_class)
 
150
    b = branch.Branch.open(test.get_url('branch'))
 
151
    return config.BranchStack(b)
 
152
config.test_stack_builder_registry.register('remote_branch',
 
153
                                            build_remote_branch_stack)
96
154
 
97
155
 
98
156
sample_long_alias="log -r-15..-1 --line"
102
160
editor=vim
103
161
change_editor=vimdiff -of @new_path @old_path
104
162
gpg_signing_command=gnome-gpg
 
163
gpg_signing_key=DD4D5088
105
164
log_format=short
 
165
validate_signatures_in_log=true
 
166
acceptable_keys=amy
106
167
user_global_option=something
107
168
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
108
169
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
154
215
[/a/]
155
216
check_signatures=check-available
156
217
gpg_signing_command=false
 
218
gpg_signing_key=default
157
219
user_local_option=local
158
220
# test trailing / matching
159
221
[/a/*]
403
465
        config.Config()
404
466
 
405
467
    def test_no_default_editor(self):
406
 
        self.assertRaises(NotImplementedError, config.Config().get_editor)
 
468
        self.assertRaises(
 
469
            NotImplementedError,
 
470
            self.applyDeprecated, deprecated_in((2, 4, 0)),
 
471
            config.Config().get_editor)
407
472
 
408
473
    def test_user_email(self):
409
474
        my_config = InstrumentedConfig()
452
517
        my_config = config.Config()
453
518
        self.assertEqual('long', my_config.log_format())
454
519
 
 
520
    def test_acceptable_keys_default(self):
 
521
        my_config = config.Config()
 
522
        self.assertEqual(None, my_config.acceptable_keys())
 
523
 
 
524
    def test_validate_signatures_in_log_default(self):
 
525
        my_config = config.Config()
 
526
        self.assertEqual(False, my_config.validate_signatures_in_log())
 
527
 
455
528
    def test_get_change_editor(self):
456
529
        my_config = InstrumentedConfig()
457
530
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
638
711
    def test_default_is_True(self):
639
712
        self.config = self.get_config(True)
640
713
        self.assertExpandIs(True)
641
 
        
 
714
 
642
715
    def test_default_is_False(self):
643
716
        self.config = self.get_config(False)
644
717
        self.assertExpandIs(False)
645
 
        
 
718
 
646
719
 
647
720
class TestIniConfigOptionExpansion(tests.TestCase):
648
721
    """Test option expansion from the IniConfig level.
1117
1190
 
1118
1191
    def test_configured_editor(self):
1119
1192
        my_config = config.GlobalConfig.from_string(sample_config_text)
1120
 
        self.assertEqual("vim", my_config.get_editor())
 
1193
        editor = self.applyDeprecated(
 
1194
            deprecated_in((2, 4, 0)), my_config.get_editor)
 
1195
        self.assertEqual('vim', editor)
1121
1196
 
1122
1197
    def test_signatures_always(self):
1123
1198
        my_config = config.GlobalConfig.from_string(sample_always_signatures)
1152
1227
        self.assertEqual("gnome-gpg", my_config.gpg_signing_command())
1153
1228
        self.assertEqual(False, my_config.signature_needed())
1154
1229
 
 
1230
    def test_gpg_signing_key(self):
 
1231
        my_config = self._get_sample_config()
 
1232
        self.assertEqual("DD4D5088", my_config.gpg_signing_key())
 
1233
 
1155
1234
    def _get_empty_config(self):
1156
1235
        my_config = config.GlobalConfig()
1157
1236
        return my_config
1177
1256
        my_config = self._get_sample_config()
1178
1257
        self.assertEqual("short", my_config.log_format())
1179
1258
 
 
1259
    def test_configured_acceptable_keys(self):
 
1260
        my_config = self._get_sample_config()
 
1261
        self.assertEqual("amy", my_config.acceptable_keys())
 
1262
 
 
1263
    def test_configured_validate_signatures_in_log(self):
 
1264
        my_config = self._get_sample_config()
 
1265
        self.assertEqual(True, my_config.validate_signatures_in_log())
 
1266
 
1180
1267
    def test_get_alias(self):
1181
1268
        my_config = self._get_sample_config()
1182
1269
        self.assertEqual('help', my_config.get_alias('h'))
1439
1526
        self.get_branch_config('/a')
1440
1527
        self.assertEqual("false", self.my_config.gpg_signing_command())
1441
1528
 
 
1529
    def test_gpg_signing_key(self):
 
1530
        self.get_branch_config('/b')
 
1531
        self.assertEqual("DD4D5088", self.my_config.gpg_signing_key())
 
1532
 
 
1533
    def test_gpg_signing_key_default(self):
 
1534
        self.get_branch_config('/a')
 
1535
        self.assertEqual("erik@bagfors.nu", self.my_config.gpg_signing_key())
 
1536
 
1442
1537
    def test_get_user_option_global(self):
1443
1538
        self.get_branch_config('/a')
1444
1539
        self.assertEqual('something',
1808
1903
 
1809
1904
class TestTransportConfig(tests.TestCaseWithTransport):
1810
1905
 
 
1906
    def test_load_utf8(self):
 
1907
        """Ensure we can load an utf8-encoded file."""
 
1908
        t = self.get_transport()
 
1909
        unicode_user = u'b\N{Euro Sign}ar'
 
1910
        unicode_content = u'user=%s' % (unicode_user,)
 
1911
        utf8_content = unicode_content.encode('utf8')
 
1912
        # Store the raw content in the config file
 
1913
        t.put_bytes('foo.conf', utf8_content)
 
1914
        conf = config.TransportConfig(t, 'foo.conf')
 
1915
        self.assertEquals(unicode_user, conf.get_option('user'))
 
1916
 
 
1917
    def test_load_non_ascii(self):
 
1918
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
1919
        t = self.get_transport()
 
1920
        t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
 
1921
        conf = config.TransportConfig(t, 'foo.conf')
 
1922
        self.assertRaises(errors.ConfigContentError, conf._get_configobj)
 
1923
 
 
1924
    def test_load_erroneous_content(self):
 
1925
        """Ensure we display a proper error on content that can't be parsed."""
 
1926
        t = self.get_transport()
 
1927
        t.put_bytes('foo.conf', '[open_section\n')
 
1928
        conf = config.TransportConfig(t, 'foo.conf')
 
1929
        self.assertRaises(errors.ParseConfigError, conf._get_configobj)
 
1930
 
1811
1931
    def test_get_value(self):
1812
1932
        """Test that retreiving a value from a section is possible"""
1813
 
        bzrdir_config = config.TransportConfig(transport.get_transport('.'),
 
1933
        bzrdir_config = config.TransportConfig(self.get_transport('.'),
1814
1934
                                               'control.conf')
1815
1935
        bzrdir_config.set_option('value', 'key', 'SECTION')
1816
1936
        bzrdir_config.set_option('value2', 'key2')
1846
1966
        self.assertIs(None, bzrdir_config.get_default_stack_on())
1847
1967
 
1848
1968
 
 
1969
class TestOldConfigHooks(tests.TestCaseWithTransport):
 
1970
 
 
1971
    def setUp(self):
 
1972
        super(TestOldConfigHooks, self).setUp()
 
1973
        create_configs_with_file_option(self)
 
1974
 
 
1975
    def assertGetHook(self, conf, name, value):
 
1976
        calls = []
 
1977
        def hook(*args):
 
1978
            calls.append(args)
 
1979
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
1980
        self.addCleanup(
 
1981
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
1982
        self.assertLength(0, calls)
 
1983
        actual_value = conf.get_user_option(name)
 
1984
        self.assertEquals(value, actual_value)
 
1985
        self.assertLength(1, calls)
 
1986
        self.assertEquals((conf, name, value), calls[0])
 
1987
 
 
1988
    def test_get_hook_bazaar(self):
 
1989
        self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
 
1990
 
 
1991
    def test_get_hook_locations(self):
 
1992
        self.assertGetHook(self.locations_config, 'file', 'locations')
 
1993
 
 
1994
    def test_get_hook_branch(self):
 
1995
        # Since locations masks branch, we define a different option
 
1996
        self.branch_config.set_user_option('file2', 'branch')
 
1997
        self.assertGetHook(self.branch_config, 'file2', 'branch')
 
1998
 
 
1999
    def assertSetHook(self, conf, name, value):
 
2000
        calls = []
 
2001
        def hook(*args):
 
2002
            calls.append(args)
 
2003
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
2004
        self.addCleanup(
 
2005
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
2006
        self.assertLength(0, calls)
 
2007
        conf.set_user_option(name, value)
 
2008
        self.assertLength(1, calls)
 
2009
        # We can't assert the conf object below as different configs use
 
2010
        # different means to implement set_user_option and we care only about
 
2011
        # coverage here.
 
2012
        self.assertEquals((name, value), calls[0][1:])
 
2013
 
 
2014
    def test_set_hook_bazaar(self):
 
2015
        self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
 
2016
 
 
2017
    def test_set_hook_locations(self):
 
2018
        self.assertSetHook(self.locations_config, 'foo', 'locations')
 
2019
 
 
2020
    def test_set_hook_branch(self):
 
2021
        self.assertSetHook(self.branch_config, 'foo', 'branch')
 
2022
 
 
2023
    def assertRemoveHook(self, conf, name, section_name=None):
 
2024
        calls = []
 
2025
        def hook(*args):
 
2026
            calls.append(args)
 
2027
        config.OldConfigHooks.install_named_hook('remove', hook, None)
 
2028
        self.addCleanup(
 
2029
            config.OldConfigHooks.uninstall_named_hook, 'remove', None)
 
2030
        self.assertLength(0, calls)
 
2031
        conf.remove_user_option(name, section_name)
 
2032
        self.assertLength(1, calls)
 
2033
        # We can't assert the conf object below as different configs use
 
2034
        # different means to implement remove_user_option and we care only about
 
2035
        # coverage here.
 
2036
        self.assertEquals((name,), calls[0][1:])
 
2037
 
 
2038
    def test_remove_hook_bazaar(self):
 
2039
        self.assertRemoveHook(self.bazaar_config, 'file')
 
2040
 
 
2041
    def test_remove_hook_locations(self):
 
2042
        self.assertRemoveHook(self.locations_config, 'file',
 
2043
                              self.locations_config.location)
 
2044
 
 
2045
    def test_remove_hook_branch(self):
 
2046
        self.assertRemoveHook(self.branch_config, 'file')
 
2047
 
 
2048
    def assertLoadHook(self, name, conf_class, *conf_args):
 
2049
        calls = []
 
2050
        def hook(*args):
 
2051
            calls.append(args)
 
2052
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
2053
        self.addCleanup(
 
2054
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
2055
        self.assertLength(0, calls)
 
2056
        # Build a config
 
2057
        conf = conf_class(*conf_args)
 
2058
        # Access an option to trigger a load
 
2059
        conf.get_user_option(name)
 
2060
        self.assertLength(1, calls)
 
2061
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2062
 
 
2063
    def test_load_hook_bazaar(self):
 
2064
        self.assertLoadHook('file', config.GlobalConfig)
 
2065
 
 
2066
    def test_load_hook_locations(self):
 
2067
        self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
 
2068
 
 
2069
    def test_load_hook_branch(self):
 
2070
        self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
 
2071
 
 
2072
    def assertSaveHook(self, conf):
 
2073
        calls = []
 
2074
        def hook(*args):
 
2075
            calls.append(args)
 
2076
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
2077
        self.addCleanup(
 
2078
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
2079
        self.assertLength(0, calls)
 
2080
        # Setting an option triggers a save
 
2081
        conf.set_user_option('foo', 'bar')
 
2082
        self.assertLength(1, calls)
 
2083
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2084
 
 
2085
    def test_save_hook_bazaar(self):
 
2086
        self.assertSaveHook(self.bazaar_config)
 
2087
 
 
2088
    def test_save_hook_locations(self):
 
2089
        self.assertSaveHook(self.locations_config)
 
2090
 
 
2091
    def test_save_hook_branch(self):
 
2092
        self.assertSaveHook(self.branch_config)
 
2093
 
 
2094
 
 
2095
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
 
2096
    """Tests config hooks for remote configs.
 
2097
 
 
2098
    No tests for the remove hook as this is not implemented there.
 
2099
    """
 
2100
 
 
2101
    def setUp(self):
 
2102
        super(TestOldConfigHooksForRemote, self).setUp()
 
2103
        self.transport_server = test_server.SmartTCPServer_for_testing
 
2104
        create_configs_with_file_option(self)
 
2105
 
 
2106
    def assertGetHook(self, conf, name, value):
 
2107
        calls = []
 
2108
        def hook(*args):
 
2109
            calls.append(args)
 
2110
        config.OldConfigHooks.install_named_hook('get', hook, None)
 
2111
        self.addCleanup(
 
2112
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
 
2113
        self.assertLength(0, calls)
 
2114
        actual_value = conf.get_option(name)
 
2115
        self.assertEquals(value, actual_value)
 
2116
        self.assertLength(1, calls)
 
2117
        self.assertEquals((conf, name, value), calls[0])
 
2118
 
 
2119
    def test_get_hook_remote_branch(self):
 
2120
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2121
        self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
 
2122
 
 
2123
    def test_get_hook_remote_bzrdir(self):
 
2124
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2125
        conf = remote_bzrdir._get_config()
 
2126
        conf.set_option('remotedir', 'file')
 
2127
        self.assertGetHook(conf, 'file', 'remotedir')
 
2128
 
 
2129
    def assertSetHook(self, conf, name, value):
 
2130
        calls = []
 
2131
        def hook(*args):
 
2132
            calls.append(args)
 
2133
        config.OldConfigHooks.install_named_hook('set', hook, None)
 
2134
        self.addCleanup(
 
2135
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
 
2136
        self.assertLength(0, calls)
 
2137
        conf.set_option(value, name)
 
2138
        self.assertLength(1, calls)
 
2139
        # We can't assert the conf object below as different configs use
 
2140
        # different means to implement set_user_option and we care only about
 
2141
        # coverage here.
 
2142
        self.assertEquals((name, value), calls[0][1:])
 
2143
 
 
2144
    def test_set_hook_remote_branch(self):
 
2145
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2146
        self.addCleanup(remote_branch.lock_write().unlock)
 
2147
        self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
 
2148
 
 
2149
    def test_set_hook_remote_bzrdir(self):
 
2150
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2151
        self.addCleanup(remote_branch.lock_write().unlock)
 
2152
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2153
        self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
 
2154
 
 
2155
    def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
 
2156
        calls = []
 
2157
        def hook(*args):
 
2158
            calls.append(args)
 
2159
        config.OldConfigHooks.install_named_hook('load', hook, None)
 
2160
        self.addCleanup(
 
2161
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
 
2162
        self.assertLength(0, calls)
 
2163
        # Build a config
 
2164
        conf = conf_class(*conf_args)
 
2165
        # Access an option to trigger a load
 
2166
        conf.get_option(name)
 
2167
        self.assertLength(expected_nb_calls, calls)
 
2168
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2169
 
 
2170
    def test_load_hook_remote_branch(self):
 
2171
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2172
        self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
 
2173
 
 
2174
    def test_load_hook_remote_bzrdir(self):
 
2175
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2176
        # The config file doesn't exist, set an option to force its creation
 
2177
        conf = remote_bzrdir._get_config()
 
2178
        conf.set_option('remotedir', 'file')
 
2179
        # We get one call for the server and one call for the client, this is
 
2180
        # caused by the differences in implementations betwen
 
2181
        # SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
 
2182
        # SmartServerBranchGetConfigFile (in smart/branch.py)
 
2183
        self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
 
2184
 
 
2185
    def assertSaveHook(self, conf):
 
2186
        calls = []
 
2187
        def hook(*args):
 
2188
            calls.append(args)
 
2189
        config.OldConfigHooks.install_named_hook('save', hook, None)
 
2190
        self.addCleanup(
 
2191
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
 
2192
        self.assertLength(0, calls)
 
2193
        # Setting an option triggers a save
 
2194
        conf.set_option('foo', 'bar')
 
2195
        self.assertLength(1, calls)
 
2196
        # Since we can't assert about conf, we just use the number of calls ;-/
 
2197
 
 
2198
    def test_save_hook_remote_branch(self):
 
2199
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2200
        self.addCleanup(remote_branch.lock_write().unlock)
 
2201
        self.assertSaveHook(remote_branch._get_config())
 
2202
 
 
2203
    def test_save_hook_remote_bzrdir(self):
 
2204
        remote_branch = branch.Branch.open(self.get_url('tree'))
 
2205
        self.addCleanup(remote_branch.lock_write().unlock)
 
2206
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
 
2207
        self.assertSaveHook(remote_bzrdir._get_config())
 
2208
 
 
2209
 
 
2210
class TestOption(tests.TestCase):
 
2211
 
 
2212
    def test_default_value(self):
 
2213
        opt = config.Option('foo', default='bar')
 
2214
        self.assertEquals('bar', opt.get_default())
 
2215
 
 
2216
 
 
2217
class TestOptionRegistry(tests.TestCase):
 
2218
 
 
2219
    def setUp(self):
 
2220
        super(TestOptionRegistry, self).setUp()
 
2221
        # Always start with an empty registry
 
2222
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2223
        self.registry = config.option_registry
 
2224
 
 
2225
    def test_register(self):
 
2226
        opt = config.Option('foo')
 
2227
        self.registry.register('foo', opt)
 
2228
        self.assertIs(opt, self.registry.get('foo'))
 
2229
 
 
2230
    lazy_option = config.Option('lazy_foo')
 
2231
 
 
2232
    def test_register_lazy(self):
 
2233
        self.registry.register_lazy('foo', self.__module__,
 
2234
                                    'TestOptionRegistry.lazy_option')
 
2235
        self.assertIs(self.lazy_option, self.registry.get('foo'))
 
2236
 
 
2237
    def test_registered_help(self):
 
2238
        opt = config.Option('foo')
 
2239
        self.registry.register('foo', opt, help='A simple option')
 
2240
        self.assertEquals('A simple option', self.registry.get_help('foo'))
 
2241
 
 
2242
 
 
2243
class TestRegisteredOptions(tests.TestCase):
 
2244
    """All registered options should verify some constraints."""
 
2245
 
 
2246
    scenarios = [(key, {'option_name': key, 'option': option}) for key, option
 
2247
                 in config.option_registry.iteritems()]
 
2248
 
 
2249
    def setUp(self):
 
2250
        super(TestRegisteredOptions, self).setUp()
 
2251
        self.registry = config.option_registry
 
2252
 
 
2253
    def test_proper_name(self):
 
2254
        # An option should be registered under its own name, this can't be
 
2255
        # checked at registration time for the lazy ones.
 
2256
        self.assertEquals(self.option_name, self.option.name)
 
2257
 
 
2258
    def test_help_is_set(self):
 
2259
        option_help = self.registry.get_help(self.option_name)
 
2260
        self.assertNotEquals(None, option_help)
 
2261
        # Come on, think about the user, he really wants to know whst the
 
2262
        # option is about
 
2263
        self.assertNotEquals('', option_help)
 
2264
 
 
2265
 
1849
2266
class TestSection(tests.TestCase):
1850
2267
 
1851
2268
    # FIXME: Parametrize so that all sections produced by Stores run these
1931
2348
 
1932
2349
class TestReadonlyStore(TestStore):
1933
2350
 
1934
 
    scenarios = [(key, {'get_store': builder})
1935
 
                 for key, builder in test_store_builder_registry.iteritems()]
 
2351
    scenarios = [(key, {'get_store': builder}) for key, builder
 
2352
                 in config.test_store_builder_registry.iteritems()]
1936
2353
 
1937
2354
    def setUp(self):
1938
2355
        super(TestReadonlyStore, self).setUp()
1939
 
        self.branch = self.make_branch('branch')
1940
2356
 
1941
2357
    def test_building_delays_load(self):
1942
2358
        store = self.get_store(self)
1969
2385
        self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
1970
2386
 
1971
2387
 
 
2388
class TestIniFileStoreContent(tests.TestCaseWithTransport):
 
2389
    """Simulate loading a config store without content of various encodings.
 
2390
 
 
2391
    All files produced by bzr are in utf8 content.
 
2392
 
 
2393
    Users may modify them manually and end up with a file that can't be
 
2394
    loaded. We need to issue proper error messages in this case.
 
2395
    """
 
2396
 
 
2397
    invalid_utf8_char = '\xff'
 
2398
 
 
2399
    def test_load_utf8(self):
 
2400
        """Ensure we can load an utf8-encoded file."""
 
2401
        t = self.get_transport()
 
2402
        # From http://pad.lv/799212
 
2403
        unicode_user = u'b\N{Euro Sign}ar'
 
2404
        unicode_content = u'user=%s' % (unicode_user,)
 
2405
        utf8_content = unicode_content.encode('utf8')
 
2406
        # Store the raw content in the config file
 
2407
        t.put_bytes('foo.conf', utf8_content)
 
2408
        store = config.IniFileStore(t, 'foo.conf')
 
2409
        store.load()
 
2410
        stack = config.Stack([store.get_sections], store)
 
2411
        self.assertEquals(unicode_user, stack.get('user'))
 
2412
 
 
2413
    def test_load_non_ascii(self):
 
2414
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2415
        t = self.get_transport()
 
2416
        t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2417
        store = config.IniFileStore(t, 'foo.conf')
 
2418
        self.assertRaises(errors.ConfigContentError, store.load)
 
2419
 
 
2420
    def test_load_erroneous_content(self):
 
2421
        """Ensure we display a proper error on content that can't be parsed."""
 
2422
        t = self.get_transport()
 
2423
        t.put_bytes('foo.conf', '[open_section\n')
 
2424
        store = config.IniFileStore(t, 'foo.conf')
 
2425
        self.assertRaises(errors.ParseConfigError, store.load)
 
2426
 
 
2427
 
 
2428
class TestIniConfigContent(tests.TestCaseWithTransport):
 
2429
    """Simulate loading a IniBasedConfig without content of various encodings.
 
2430
 
 
2431
    All files produced by bzr are in utf8 content.
 
2432
 
 
2433
    Users may modify them manually and end up with a file that can't be
 
2434
    loaded. We need to issue proper error messages in this case.
 
2435
    """
 
2436
 
 
2437
    invalid_utf8_char = '\xff'
 
2438
 
 
2439
    def test_load_utf8(self):
 
2440
        """Ensure we can load an utf8-encoded file."""
 
2441
        # From http://pad.lv/799212
 
2442
        unicode_user = u'b\N{Euro Sign}ar'
 
2443
        unicode_content = u'user=%s' % (unicode_user,)
 
2444
        utf8_content = unicode_content.encode('utf8')
 
2445
        # Store the raw content in the config file
 
2446
        with open('foo.conf', 'wb') as f:
 
2447
            f.write(utf8_content)
 
2448
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2449
        self.assertEquals(unicode_user, conf.get_user_option('user'))
 
2450
 
 
2451
    def test_load_badly_encoded_content(self):
 
2452
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
 
2453
        with open('foo.conf', 'wb') as f:
 
2454
            f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
 
2455
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2456
        self.assertRaises(errors.ConfigContentError, conf._get_parser)
 
2457
 
 
2458
    def test_load_erroneous_content(self):
 
2459
        """Ensure we display a proper error on content that can't be parsed."""
 
2460
        with open('foo.conf', 'wb') as f:
 
2461
            f.write('[open_section\n')
 
2462
        conf = config.IniBasedConfig(file_name='foo.conf')
 
2463
        self.assertRaises(errors.ParseConfigError, conf._get_parser)
 
2464
 
 
2465
 
1972
2466
class TestMutableStore(TestStore):
1973
2467
 
1974
 
    scenarios = [(key, {'store_id': key, 'get_store': builder})
1975
 
                 for key, builder in test_store_builder_registry.iteritems()]
 
2468
    scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
 
2469
                 in config.test_store_builder_registry.iteritems()]
1976
2470
 
1977
2471
    def setUp(self):
1978
2472
        super(TestMutableStore, self).setUp()
1979
2473
        self.transport = self.get_transport()
1980
 
        self.branch = self.make_branch('branch')
1981
2474
 
1982
2475
    def has_store(self, store):
1983
2476
        store_basename = urlutils.relative_url(self.transport.external_url(),
1985
2478
        return self.transport.has(store_basename)
1986
2479
 
1987
2480
    def test_save_empty_creates_no_file(self):
1988
 
        if self.store_id == 'branch':
 
2481
        # FIXME: There should be a better way than relying on the test
 
2482
        # parametrization to identify branch.conf -- vila 2011-0526
 
2483
        if self.store_id in ('branch', 'remote_branch'):
1989
2484
            raise tests.TestNotApplicable(
1990
2485
                'branch.conf is *always* created when a branch is initialized')
1991
2486
        store = self.get_store(self)
2004
2499
        self.assertLength(0, sections)
2005
2500
 
2006
2501
    def test_save_with_content_succeeds(self):
2007
 
        if self.store_id == 'branch':
 
2502
        # FIXME: There should be a better way than relying on the test
 
2503
        # parametrization to identify branch.conf -- vila 2011-0526
 
2504
        if self.store_id in ('branch', 'remote_branch'):
2008
2505
            raise tests.TestNotApplicable(
2009
2506
                'branch.conf is *always* created when a branch is initialized')
2010
2507
        store = self.get_store(self)
2049
2546
        self.assertLength(1, sections)
2050
2547
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2051
2548
 
 
2549
    def test_load_hook(self):
 
2550
        # We first needs to ensure that the store exists
 
2551
        store = self.get_store(self)
 
2552
        section = store.get_mutable_section('baz')
 
2553
        section.set('foo', 'bar')
 
2554
        store.save()
 
2555
        # Now we can try to load it
 
2556
        store = self.get_store(self)
 
2557
        calls = []
 
2558
        def hook(*args):
 
2559
            calls.append(args)
 
2560
        config.ConfigHooks.install_named_hook('load', hook, None)
 
2561
        self.assertLength(0, calls)
 
2562
        store.load()
 
2563
        self.assertLength(1, calls)
 
2564
        self.assertEquals((store,), calls[0])
 
2565
 
 
2566
    def test_save_hook(self):
 
2567
        calls = []
 
2568
        def hook(*args):
 
2569
            calls.append(args)
 
2570
        config.ConfigHooks.install_named_hook('save', hook, None)
 
2571
        self.assertLength(0, calls)
 
2572
        store = self.get_store(self)
 
2573
        section = store.get_mutable_section('baz')
 
2574
        section.set('foo', 'bar')
 
2575
        store.save()
 
2576
        self.assertLength(1, calls)
 
2577
        self.assertEquals((store,), calls[0])
 
2578
 
2052
2579
 
2053
2580
class TestIniFileStore(TestStore):
2054
2581
 
2103
2630
class TestLockableIniFileStore(TestStore):
2104
2631
 
2105
2632
    def test_create_store_in_created_dir(self):
 
2633
        self.assertPathDoesNotExist('dir')
2106
2634
        t = self.get_transport('dir/subdir')
2107
2635
        store = config.LockableIniFileStore(t, 'foo.conf')
2108
2636
        store.get_mutable_section(None).set('foo', 'bar')
2109
2637
        store.save()
2110
 
 
2111
 
    # FIXME: We should adapt the tests in TestLockableConfig about concurrent
2112
 
    # writes. Since this requires a clearer rewrite, I'll just rely on using
2113
 
    # the same code in LockableIniFileStore (copied from LockableConfig, but
2114
 
    # trivial enough, the main difference is that we add @needs_write_lock on
2115
 
    # save() instead of set_user_option() and remove_user_option()). The intent
2116
 
    # is to ensure that we always get a valid content for the store even when
2117
 
    # concurrent accesses occur, read/write, write/write. It may be worth
2118
 
    # looking into removing the lock dir when it;s not needed anymore and look
2119
 
    # at possible fallouts for concurrent lockers -- vila 20110-04-06
 
2638
        self.assertPathExists('dir/subdir')
 
2639
 
 
2640
 
 
2641
class TestConcurrentStoreUpdates(TestStore):
 
2642
    """Test that Stores properly handle conccurent updates.
 
2643
 
 
2644
    New Store implementation may fail some of these tests but until such
 
2645
    implementations exist it's hard to properly filter them from the scenarios
 
2646
    applied here. If you encounter such a case, contact the bzr devs.
 
2647
    """
 
2648
 
 
2649
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
2650
                 in config.test_stack_builder_registry.iteritems()]
 
2651
 
 
2652
    def setUp(self):
 
2653
        super(TestConcurrentStoreUpdates, self).setUp()
 
2654
        self._content = 'one=1\ntwo=2\n'
 
2655
        self.stack = self.get_stack(self)
 
2656
        if not isinstance(self.stack, config._CompatibleStack):
 
2657
            raise tests.TestNotApplicable(
 
2658
                '%s is not meant to be compatible with the old config design'
 
2659
                % (self.stack,))
 
2660
        self.stack.store._load_from_string(self._content)
 
2661
        # Flush the store
 
2662
        self.stack.store.save()
 
2663
 
 
2664
    def test_simple_read_access(self):
 
2665
        self.assertEquals('1', self.stack.get('one'))
 
2666
 
 
2667
    def test_simple_write_access(self):
 
2668
        self.stack.set('one', 'one')
 
2669
        self.assertEquals('one', self.stack.get('one'))
 
2670
 
 
2671
    def test_listen_to_the_last_speaker(self):
 
2672
        c1 = self.stack
 
2673
        c2 = self.get_stack(self)
 
2674
        c1.set('one', 'ONE')
 
2675
        c2.set('two', 'TWO')
 
2676
        self.assertEquals('ONE', c1.get('one'))
 
2677
        self.assertEquals('TWO', c2.get('two'))
 
2678
        # The second update respect the first one
 
2679
        self.assertEquals('ONE', c2.get('one'))
 
2680
 
 
2681
    def test_last_speaker_wins(self):
 
2682
        # If the same config is not shared, the same variable modified twice
 
2683
        # can only see a single result.
 
2684
        c1 = self.stack
 
2685
        c2 = self.get_stack(self)
 
2686
        c1.set('one', 'c1')
 
2687
        c2.set('one', 'c2')
 
2688
        self.assertEquals('c2', c2.get('one'))
 
2689
        # The first modification is still available until another refresh
 
2690
        # occur
 
2691
        self.assertEquals('c1', c1.get('one'))
 
2692
        c1.set('two', 'done')
 
2693
        self.assertEquals('c2', c1.get('one'))
 
2694
 
 
2695
    def test_writes_are_serialized(self):
 
2696
        c1 = self.stack
 
2697
        c2 = self.get_stack(self)
 
2698
 
 
2699
        # We spawn a thread that will pause *during* the config saving.
 
2700
        before_writing = threading.Event()
 
2701
        after_writing = threading.Event()
 
2702
        writing_done = threading.Event()
 
2703
        c1_save_without_locking_orig = c1.store.save_without_locking
 
2704
        def c1_save_without_locking():
 
2705
            before_writing.set()
 
2706
            c1_save_without_locking_orig()
 
2707
            # The lock is held. We wait for the main thread to decide when to
 
2708
            # continue
 
2709
            after_writing.wait()
 
2710
        c1.store.save_without_locking = c1_save_without_locking
 
2711
        def c1_set():
 
2712
            c1.set('one', 'c1')
 
2713
            writing_done.set()
 
2714
        t1 = threading.Thread(target=c1_set)
 
2715
        # Collect the thread after the test
 
2716
        self.addCleanup(t1.join)
 
2717
        # Be ready to unblock the thread if the test goes wrong
 
2718
        self.addCleanup(after_writing.set)
 
2719
        t1.start()
 
2720
        before_writing.wait()
 
2721
        self.assertRaises(errors.LockContention,
 
2722
                          c2.set, 'one', 'c2')
 
2723
        self.assertEquals('c1', c1.get('one'))
 
2724
        # Let the lock be released
 
2725
        after_writing.set()
 
2726
        writing_done.wait()
 
2727
        c2.set('one', 'c2')
 
2728
        self.assertEquals('c2', c2.get('one'))
 
2729
 
 
2730
    def test_read_while_writing(self):
 
2731
       c1 = self.stack
 
2732
       # We spawn a thread that will pause *during* the write
 
2733
       ready_to_write = threading.Event()
 
2734
       do_writing = threading.Event()
 
2735
       writing_done = threading.Event()
 
2736
       # We override the _save implementation so we know the store is locked
 
2737
       c1_save_without_locking_orig = c1.store.save_without_locking
 
2738
       def c1_save_without_locking():
 
2739
           ready_to_write.set()
 
2740
           # The lock is held. We wait for the main thread to decide when to
 
2741
           # continue
 
2742
           do_writing.wait()
 
2743
           c1_save_without_locking_orig()
 
2744
           writing_done.set()
 
2745
       c1.store.save_without_locking = c1_save_without_locking
 
2746
       def c1_set():
 
2747
           c1.set('one', 'c1')
 
2748
       t1 = threading.Thread(target=c1_set)
 
2749
       # Collect the thread after the test
 
2750
       self.addCleanup(t1.join)
 
2751
       # Be ready to unblock the thread if the test goes wrong
 
2752
       self.addCleanup(do_writing.set)
 
2753
       t1.start()
 
2754
       # Ensure the thread is ready to write
 
2755
       ready_to_write.wait()
 
2756
       self.assertEquals('c1', c1.get('one'))
 
2757
       # If we read during the write, we get the old value
 
2758
       c2 = self.get_stack(self)
 
2759
       self.assertEquals('1', c2.get('one'))
 
2760
       # Let the writing occur and ensure it occurred
 
2761
       do_writing.set()
 
2762
       writing_done.wait()
 
2763
       # Now we get the updated value
 
2764
       c3 = self.get_stack(self)
 
2765
       self.assertEquals('c1', c3.get('one'))
 
2766
 
 
2767
    # FIXME: It may be worth looking into removing the lock dir when it's not
 
2768
    # needed anymore and look at possible fallouts for concurrent lockers. This
 
2769
    # will matter if/when we use config files outside of bazaar directories
 
2770
    # (.bazaar or .bzr) -- vila 20110-04-11
2120
2771
 
2121
2772
 
2122
2773
class TestSectionMatcher(TestStore):
2200
2851
 
2201
2852
    def test_file_urls_are_normalized(self):
2202
2853
        store = self.get_store('foo.conf')
2203
 
        matcher = config.LocationMatcher(store, 'file:///dir/subdir')
2204
 
        self.assertEquals('/dir/subdir', matcher.location)
 
2854
        if sys.platform == 'win32':
 
2855
            expected_url = 'file:///C:/dir/subdir'
 
2856
            expected_location = 'C:/dir/subdir'
 
2857
        else:
 
2858
            expected_url = 'file:///dir/subdir'
 
2859
            expected_location = '/dir/subdir'
 
2860
        matcher = config.LocationMatcher(store, expected_url)
 
2861
        self.assertEquals(expected_location, matcher.location)
2205
2862
 
2206
2863
 
2207
2864
class TestStackGet(tests.TestCase):
2214
2871
        conf_stack = config.Stack([conf])
2215
2872
        self.assertEquals('bar', conf_stack.get('foo'))
2216
2873
 
 
2874
    def test_get_with_registered_default_value(self):
 
2875
        conf_stack = config.Stack([dict()])
 
2876
        opt = config.Option('foo', default='bar')
 
2877
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2878
        config.option_registry.register('foo', opt)
 
2879
        self.assertEquals('bar', conf_stack.get('foo'))
 
2880
 
 
2881
    def test_get_without_registered_default_value(self):
 
2882
        conf_stack = config.Stack([dict()])
 
2883
        opt = config.Option('foo')
 
2884
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2885
        config.option_registry.register('foo', opt)
 
2886
        self.assertEquals(None, conf_stack.get('foo'))
 
2887
 
 
2888
    def test_get_without_default_value_for_not_registered(self):
 
2889
        conf_stack = config.Stack([dict()])
 
2890
        opt = config.Option('foo')
 
2891
        self.overrideAttr(config, 'option_registry', registry.Registry())
 
2892
        self.assertEquals(None, conf_stack.get('foo'))
 
2893
 
2217
2894
    def test_get_first_definition(self):
2218
2895
        conf1 = dict(foo='bar')
2219
2896
        conf2 = dict(foo='baz')
2226
2903
        conf_stack = config.Stack([conf1, conf2])
2227
2904
        self.assertEquals('baz', conf_stack.get('foo'))
2228
2905
 
2229
 
    def test_get_for_empty_stack(self):
2230
 
        conf_stack = config.Stack([])
2231
 
        self.assertEquals(None, conf_stack.get('foo'))
2232
 
 
2233
2906
    def test_get_for_empty_section_callable(self):
2234
2907
        conf_stack = config.Stack([lambda : []])
2235
2908
        self.assertEquals(None, conf_stack.get('foo'))
2242
2915
 
2243
2916
class TestStackWithTransport(tests.TestCaseWithTransport):
2244
2917
 
2245
 
    def setUp(self):
2246
 
        super(TestStackWithTransport, self).setUp()
2247
 
        # FIXME: A more elaborate builder for the stack would avoid building a
2248
 
        # branch even for tests that don't need it.
2249
 
        self.branch = self.make_branch('branch')
 
2918
    scenarios = [(key, {'get_stack': builder}) for key, builder
 
2919
                 in config.test_stack_builder_registry.iteritems()]
 
2920
 
 
2921
 
 
2922
class TestConcreteStacks(TestStackWithTransport):
 
2923
 
 
2924
    def test_build_stack(self):
 
2925
        # Just a smoke test to help debug builders
 
2926
        stack = self.get_stack(self)
 
2927
 
 
2928
 
 
2929
class TestStackGet(TestStackWithTransport):
 
2930
 
 
2931
    def test_get_for_empty_stack(self):
 
2932
        conf = self.get_stack(self)
 
2933
        self.assertEquals(None, conf.get('foo'))
 
2934
 
 
2935
    def test_get_hook(self):
 
2936
        conf = self.get_stack(self)
 
2937
        conf.store._load_from_string('foo=bar')
 
2938
        calls = []
 
2939
        def hook(*args):
 
2940
            calls.append(args)
 
2941
        config.ConfigHooks.install_named_hook('get', hook, None)
 
2942
        self.assertLength(0, calls)
 
2943
        value = conf.get('foo')
 
2944
        self.assertEquals('bar', value)
 
2945
        self.assertLength(1, calls)
 
2946
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
2250
2947
 
2251
2948
 
2252
2949
class TestStackSet(TestStackWithTransport):
2253
2950
 
2254
 
    scenarios = [(key, {'get_stack': builder})
2255
 
                 for key, builder in test_stack_builder_registry.iteritems()]
2256
 
 
2257
2951
    def test_simple_set(self):
2258
2952
        conf = self.get_stack(self)
2259
2953
        conf.store._load_from_string('foo=bar')
2267
2961
        conf.set('foo', 'baz')
2268
2962
        self.assertEquals, 'baz', conf.get('foo')
2269
2963
 
 
2964
    def test_set_hook(self):
 
2965
        calls = []
 
2966
        def hook(*args):
 
2967
            calls.append(args)
 
2968
        config.ConfigHooks.install_named_hook('set', hook, None)
 
2969
        self.assertLength(0, calls)
 
2970
        conf = self.get_stack(self)
 
2971
        conf.set('foo', 'bar')
 
2972
        self.assertLength(1, calls)
 
2973
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
 
2974
 
2270
2975
 
2271
2976
class TestStackRemove(TestStackWithTransport):
2272
2977
 
2273
 
    scenarios = [(key, {'get_stack': builder})
2274
 
                 for key, builder in test_stack_builder_registry.iteritems()]
2275
 
 
2276
2978
    def test_remove_existing(self):
2277
2979
        conf = self.get_stack(self)
2278
2980
        conf.store._load_from_string('foo=bar')
2285
2987
        conf = self.get_stack(self)
2286
2988
        self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
2287
2989
 
2288
 
 
2289
 
class TestConcreteStacks(TestStackWithTransport):
2290
 
 
2291
 
    scenarios = [(key, {'get_stack': builder})
2292
 
                 for key, builder in test_stack_builder_registry.iteritems()]
2293
 
 
2294
 
    def test_build_stack(self):
2295
 
        stack = self.get_stack(self)
 
2990
    def test_remove_hook(self):
 
2991
        calls = []
 
2992
        def hook(*args):
 
2993
            calls.append(args)
 
2994
        config.ConfigHooks.install_named_hook('remove', hook, None)
 
2995
        self.assertLength(0, calls)
 
2996
        conf = self.get_stack(self)
 
2997
        conf.store._load_from_string('foo=bar')
 
2998
        conf.remove('foo')
 
2999
        self.assertLength(1, calls)
 
3000
        self.assertEquals((conf, 'foo'), calls[0])
2296
3001
 
2297
3002
 
2298
3003
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
2466
3171
        self.assertEquals({}, conf._get_config())
2467
3172
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
2468
3173
 
 
3174
    def test_non_utf8_config(self):
 
3175
        conf = config.AuthenticationConfig(_file=StringIO(
 
3176
                'foo = bar\xff'))
 
3177
        self.assertRaises(errors.ConfigContentError, conf._get_config)
 
3178
        
2469
3179
    def test_missing_auth_section_header(self):
2470
3180
        conf = config.AuthenticationConfig(_file=StringIO('foo = bar'))
2471
3181
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
2729
3439
 
2730
3440
    def test_username_defaults_prompts(self):
2731
3441
        # HTTP prompts can't be tested here, see test_http.py
2732
 
        self._check_default_username_prompt('FTP %(host)s username: ', 'ftp')
2733
 
        self._check_default_username_prompt(
2734
 
            'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
2735
 
        self._check_default_username_prompt(
2736
 
            'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
 
3442
        self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
 
3443
        self._check_default_username_prompt(
 
3444
            u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
 
3445
        self._check_default_username_prompt(
 
3446
            u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
2737
3447
 
2738
3448
    def test_username_default_no_prompt(self):
2739
3449
        conf = config.AuthenticationConfig()
2745
3455
    def test_password_default_prompts(self):
2746
3456
        # HTTP prompts can't be tested here, see test_http.py
2747
3457
        self._check_default_password_prompt(
2748
 
            'FTP %(user)s@%(host)s password: ', 'ftp')
2749
 
        self._check_default_password_prompt(
2750
 
            'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
2751
 
        self._check_default_password_prompt(
2752
 
            'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
 
3458
            u'FTP %(user)s@%(host)s password: ', 'ftp')
 
3459
        self._check_default_password_prompt(
 
3460
            u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
 
3461
        self._check_default_password_prompt(
 
3462
            u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
2753
3463
        # SMTP port handling is a bit special (it's handled if embedded in the
2754
3464
        # host too)
2755
3465
        # FIXME: should we: forbid that, extend it to other schemes, leave
2756
3466
        # things as they are that's fine thank you ?
2757
 
        self._check_default_password_prompt('SMTP %(user)s@%(host)s password: ',
2758
 
                                            'smtp')
2759
 
        self._check_default_password_prompt('SMTP %(user)s@%(host)s password: ',
2760
 
                                            'smtp', host='bar.org:10025')
2761
 
        self._check_default_password_prompt(
2762
 
            'SMTP %(user)s@%(host)s:%(port)d password: ',
2763
 
            'smtp', port=10025)
 
3467
        self._check_default_password_prompt(
 
3468
            u'SMTP %(user)s@%(host)s password: ', 'smtp')
 
3469
        self._check_default_password_prompt(
 
3470
            u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
 
3471
        self._check_default_password_prompt(
 
3472
            u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
2764
3473
 
2765
3474
    def test_ssh_password_emits_warning(self):
2766
3475
        conf = config.AuthenticationConfig(_file=StringIO(
2960
3669
        to be able to choose a user name with no configuration.
2961
3670
        """
2962
3671
        if sys.platform == 'win32':
2963
 
            raise TestSkipped("User name inference not implemented on win32")
 
3672
            raise tests.TestSkipped(
 
3673
                "User name inference not implemented on win32")
2964
3674
        realname, address = config._auto_user_id()
2965
3675
        if os.path.exists('/etc/mailname'):
2966
3676
            self.assertIsNot(None, realname)