~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_config.py

  • Committer: INADA Naoki
  • Date: 2011-05-05 09:15:34 UTC
  • mto: (5830.3.3 i18n-msgfmt)
  • mto: This revision was merged to the branch mainline in revision 5873.
  • Revision ID: songofacandy@gmail.com-20110505091534-7sv835xpofwrmpt4
Add update-pot command to Makefile and tools/bzrgettext script that
extracts help text from bzr commands.

Show diffs side-by-side

added added

removed removed

Lines of Context:
36
36
    mergetools,
37
37
    ui,
38
38
    urlutils,
39
 
    registry,
40
 
    remote,
41
39
    tests,
42
40
    trace,
43
41
    transport,
44
42
    )
45
 
from bzrlib.symbol_versioning import (
46
 
    deprecated_in,
47
 
    deprecated_method,
48
 
    )
49
 
from bzrlib.transport import remote as transport_remote
50
43
from bzrlib.tests import (
51
44
    features,
 
45
    TestSkipped,
52
46
    scenarios,
53
 
    test_server,
54
47
    )
55
48
from bzrlib.util.configobj import configobj
56
49
 
69
62
 
70
63
load_tests = scenarios.load_tests_apply_scenarios
71
64
 
72
 
# Register helpers to build stores
73
 
config.test_store_builder_registry.register(
74
 
    'configobj', lambda test: config.IniFileStore(test.get_transport(),
75
 
                                                  'configobj.conf'))
76
 
config.test_store_builder_registry.register(
77
 
    'bazaar', lambda test: config.GlobalStore())
78
 
config.test_store_builder_registry.register(
79
 
    'location', lambda test: config.LocationStore())
80
 
 
81
 
 
82
 
def build_backing_branch(test, relpath,
83
 
                         transport_class=None, server_class=None):
84
 
    """Test helper to create a backing branch only once.
85
 
 
86
 
    Some tests needs multiple stores/stacks to check concurrent update
87
 
    behaviours. As such, they need to build different branch *objects* even if
88
 
    they share the branch on disk.
89
 
 
90
 
    :param relpath: The relative path to the branch. (Note that the helper
91
 
        should always specify the same relpath).
92
 
 
93
 
    :param transport_class: The Transport class the test needs to use.
94
 
 
95
 
    :param server_class: The server associated with the ``transport_class``
96
 
        above.
97
 
 
98
 
    Either both or neither of ``transport_class`` and ``server_class`` should
99
 
    be specified.
100
 
    """
101
 
    if transport_class is not None and server_class is not None:
102
 
        test.transport_class = transport_class
103
 
        test.transport_server = server_class
104
 
    elif not (transport_class is None and server_class is None):
105
 
        raise AssertionError('Specify both ``transport_class`` and '
106
 
                             '``server_class`` or neither of them')
107
 
    if getattr(test, 'backing_branch', None) is None:
108
 
        # First call, let's build the branch on disk
109
 
        test.backing_branch = test.make_branch(relpath)
110
 
 
111
 
 
112
 
def build_branch_store(test):
113
 
    build_backing_branch(test, 'branch')
114
 
    b = branch.Branch.open('branch')
115
 
    return config.BranchStore(b)
116
 
config.test_store_builder_registry.register('branch', build_branch_store)
117
 
 
118
 
 
119
 
def build_remote_branch_store(test):
120
 
    # There is only one permutation (but we won't be able to handle more with
121
 
    # this design anyway)
122
 
    (transport_class,
123
 
     server_class) = transport_remote.get_test_permutations()[0]
124
 
    build_backing_branch(test, 'branch', transport_class, server_class)
125
 
    b = branch.Branch.open(test.get_url('branch'))
126
 
    return config.BranchStore(b)
127
 
config.test_store_builder_registry.register('remote_branch',
128
 
                                            build_remote_branch_store)
129
 
 
130
 
 
131
 
config.test_stack_builder_registry.register(
132
 
    'bazaar', lambda test: config.GlobalStack())
133
 
config.test_stack_builder_registry.register(
134
 
    'location', lambda test: config.LocationStack('.'))
135
 
 
136
 
 
137
 
def build_branch_stack(test):
138
 
    build_backing_branch(test, 'branch')
139
 
    b = branch.Branch.open('branch')
140
 
    return config.BranchStack(b)
141
 
config.test_stack_builder_registry.register('branch', build_branch_stack)
142
 
 
143
 
 
144
 
def build_remote_branch_stack(test):
145
 
    # There is only one permutation (but we won't be able to handle more with
146
 
    # this design anyway)
147
 
    (transport_class,
148
 
     server_class) = transport_remote.get_test_permutations()[0]
149
 
    build_backing_branch(test, 'branch', transport_class, server_class)
150
 
    b = branch.Branch.open(test.get_url('branch'))
151
 
    return config.BranchStack(b)
152
 
config.test_stack_builder_registry.register('remote_branch',
153
 
                                            build_remote_branch_stack)
154
 
 
155
65
 
156
66
sample_long_alias="log -r-15..-1 --line"
157
67
sample_config_text = u"""
161
71
change_editor=vimdiff -of @new_path @old_path
162
72
gpg_signing_command=gnome-gpg
163
73
log_format=short
164
 
validate_signatures_in_log=true
165
 
acceptable_keys=amy
166
74
user_global_option=something
167
75
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
168
76
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
463
371
        config.Config()
464
372
 
465
373
    def test_no_default_editor(self):
466
 
        self.assertRaises(
467
 
            NotImplementedError,
468
 
            self.applyDeprecated, deprecated_in((2, 4, 0)),
469
 
            config.Config().get_editor)
 
374
        self.assertRaises(NotImplementedError, config.Config().get_editor)
470
375
 
471
376
    def test_user_email(self):
472
377
        my_config = InstrumentedConfig()
515
420
        my_config = config.Config()
516
421
        self.assertEqual('long', my_config.log_format())
517
422
 
518
 
    def test_acceptable_keys_default(self):
519
 
        my_config = config.Config()
520
 
        self.assertEqual(None, my_config.acceptable_keys())
521
 
 
522
 
    def test_validate_signatures_in_log_default(self):
523
 
        my_config = config.Config()
524
 
        self.assertEqual(False, my_config.validate_signatures_in_log())
525
 
 
526
423
    def test_get_change_editor(self):
527
424
        my_config = InstrumentedConfig()
528
425
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
709
606
    def test_default_is_True(self):
710
607
        self.config = self.get_config(True)
711
608
        self.assertExpandIs(True)
712
 
 
 
609
        
713
610
    def test_default_is_False(self):
714
611
        self.config = self.get_config(False)
715
612
        self.assertExpandIs(False)
716
 
 
 
613
        
717
614
 
718
615
class TestIniConfigOptionExpansion(tests.TestCase):
719
616
    """Test option expansion from the IniConfig level.
935
832
        def c1_write_config_file():
936
833
            before_writing.set()
937
834
            c1_orig()
938
 
            # The lock is held. We wait for the main thread to decide when to
 
835
            # The lock is held we wait for the main thread to decide when to
939
836
            # continue
940
837
            after_writing.wait()
941
838
        c1._write_config_file = c1_write_config_file
968
865
       c1_orig = c1._write_config_file
969
866
       def c1_write_config_file():
970
867
           ready_to_write.set()
971
 
           # The lock is held. We wait for the main thread to decide when to
 
868
           # The lock is held we wait for the main thread to decide when to
972
869
           # continue
973
870
           do_writing.wait()
974
871
           c1_orig()
1188
1085
 
1189
1086
    def test_configured_editor(self):
1190
1087
        my_config = config.GlobalConfig.from_string(sample_config_text)
1191
 
        editor = self.applyDeprecated(
1192
 
            deprecated_in((2, 4, 0)), my_config.get_editor)
1193
 
        self.assertEqual('vim', editor)
 
1088
        self.assertEqual("vim", my_config.get_editor())
1194
1089
 
1195
1090
    def test_signatures_always(self):
1196
1091
        my_config = config.GlobalConfig.from_string(sample_always_signatures)
1250
1145
        my_config = self._get_sample_config()
1251
1146
        self.assertEqual("short", my_config.log_format())
1252
1147
 
1253
 
    def test_configured_acceptable_keys(self):
1254
 
        my_config = self._get_sample_config()
1255
 
        self.assertEqual("amy", my_config.acceptable_keys())
1256
 
 
1257
 
    def test_configured_validate_signatures_in_log(self):
1258
 
        my_config = self._get_sample_config()
1259
 
        self.assertEqual(True, my_config.validate_signatures_in_log())
1260
 
 
1261
1148
    def test_get_alias(self):
1262
1149
        my_config = self._get_sample_config()
1263
1150
        self.assertEqual('help', my_config.get_alias('h'))
1889
1776
 
1890
1777
class TestTransportConfig(tests.TestCaseWithTransport):
1891
1778
 
1892
 
    def test_load_utf8(self):
1893
 
        """Ensure we can load an utf8-encoded file."""
1894
 
        t = self.get_transport()
1895
 
        unicode_user = u'b\N{Euro Sign}ar'
1896
 
        unicode_content = u'user=%s' % (unicode_user,)
1897
 
        utf8_content = unicode_content.encode('utf8')
1898
 
        # Store the raw content in the config file
1899
 
        t.put_bytes('foo.conf', utf8_content)
1900
 
        conf = config.TransportConfig(t, 'foo.conf')
1901
 
        self.assertEquals(unicode_user, conf.get_option('user'))
1902
 
 
1903
 
    def test_load_non_ascii(self):
1904
 
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
1905
 
        t = self.get_transport()
1906
 
        t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
1907
 
        conf = config.TransportConfig(t, 'foo.conf')
1908
 
        self.assertRaises(errors.ConfigContentError, conf._get_configobj)
1909
 
 
1910
 
    def test_load_erroneous_content(self):
1911
 
        """Ensure we display a proper error on content that can't be parsed."""
1912
 
        t = self.get_transport()
1913
 
        t.put_bytes('foo.conf', '[open_section\n')
1914
 
        conf = config.TransportConfig(t, 'foo.conf')
1915
 
        self.assertRaises(errors.ParseConfigError, conf._get_configobj)
1916
 
 
1917
1779
    def test_get_value(self):
1918
1780
        """Test that retreiving a value from a section is possible"""
1919
 
        bzrdir_config = config.TransportConfig(self.get_transport('.'),
 
1781
        bzrdir_config = config.TransportConfig(transport.get_transport('.'),
1920
1782
                                               'control.conf')
1921
1783
        bzrdir_config.set_option('value', 'key', 'SECTION')
1922
1784
        bzrdir_config.set_option('value2', 'key2')
1952
1814
        self.assertIs(None, bzrdir_config.get_default_stack_on())
1953
1815
 
1954
1816
 
1955
 
class TestOldConfigHooks(tests.TestCaseWithTransport):
1956
 
 
1957
 
    def setUp(self):
1958
 
        super(TestOldConfigHooks, self).setUp()
1959
 
        create_configs_with_file_option(self)
1960
 
 
1961
 
    def assertGetHook(self, conf, name, value):
1962
 
        calls = []
1963
 
        def hook(*args):
1964
 
            calls.append(args)
1965
 
        config.OldConfigHooks.install_named_hook('get', hook, None)
1966
 
        self.addCleanup(
1967
 
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
1968
 
        self.assertLength(0, calls)
1969
 
        actual_value = conf.get_user_option(name)
1970
 
        self.assertEquals(value, actual_value)
1971
 
        self.assertLength(1, calls)
1972
 
        self.assertEquals((conf, name, value), calls[0])
1973
 
 
1974
 
    def test_get_hook_bazaar(self):
1975
 
        self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
1976
 
 
1977
 
    def test_get_hook_locations(self):
1978
 
        self.assertGetHook(self.locations_config, 'file', 'locations')
1979
 
 
1980
 
    def test_get_hook_branch(self):
1981
 
        # Since locations masks branch, we define a different option
1982
 
        self.branch_config.set_user_option('file2', 'branch')
1983
 
        self.assertGetHook(self.branch_config, 'file2', 'branch')
1984
 
 
1985
 
    def assertSetHook(self, conf, name, value):
1986
 
        calls = []
1987
 
        def hook(*args):
1988
 
            calls.append(args)
1989
 
        config.OldConfigHooks.install_named_hook('set', hook, None)
1990
 
        self.addCleanup(
1991
 
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
1992
 
        self.assertLength(0, calls)
1993
 
        conf.set_user_option(name, value)
1994
 
        self.assertLength(1, calls)
1995
 
        # We can't assert the conf object below as different configs use
1996
 
        # different means to implement set_user_option and we care only about
1997
 
        # coverage here.
1998
 
        self.assertEquals((name, value), calls[0][1:])
1999
 
 
2000
 
    def test_set_hook_bazaar(self):
2001
 
        self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
2002
 
 
2003
 
    def test_set_hook_locations(self):
2004
 
        self.assertSetHook(self.locations_config, 'foo', 'locations')
2005
 
 
2006
 
    def test_set_hook_branch(self):
2007
 
        self.assertSetHook(self.branch_config, 'foo', 'branch')
2008
 
 
2009
 
    def assertRemoveHook(self, conf, name, section_name=None):
2010
 
        calls = []
2011
 
        def hook(*args):
2012
 
            calls.append(args)
2013
 
        config.OldConfigHooks.install_named_hook('remove', hook, None)
2014
 
        self.addCleanup(
2015
 
            config.OldConfigHooks.uninstall_named_hook, 'remove', None)
2016
 
        self.assertLength(0, calls)
2017
 
        conf.remove_user_option(name, section_name)
2018
 
        self.assertLength(1, calls)
2019
 
        # We can't assert the conf object below as different configs use
2020
 
        # different means to implement remove_user_option and we care only about
2021
 
        # coverage here.
2022
 
        self.assertEquals((name,), calls[0][1:])
2023
 
 
2024
 
    def test_remove_hook_bazaar(self):
2025
 
        self.assertRemoveHook(self.bazaar_config, 'file')
2026
 
 
2027
 
    def test_remove_hook_locations(self):
2028
 
        self.assertRemoveHook(self.locations_config, 'file',
2029
 
                              self.locations_config.location)
2030
 
 
2031
 
    def test_remove_hook_branch(self):
2032
 
        self.assertRemoveHook(self.branch_config, 'file')
2033
 
 
2034
 
    def assertLoadHook(self, name, conf_class, *conf_args):
2035
 
        calls = []
2036
 
        def hook(*args):
2037
 
            calls.append(args)
2038
 
        config.OldConfigHooks.install_named_hook('load', hook, None)
2039
 
        self.addCleanup(
2040
 
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
2041
 
        self.assertLength(0, calls)
2042
 
        # Build a config
2043
 
        conf = conf_class(*conf_args)
2044
 
        # Access an option to trigger a load
2045
 
        conf.get_user_option(name)
2046
 
        self.assertLength(1, calls)
2047
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2048
 
 
2049
 
    def test_load_hook_bazaar(self):
2050
 
        self.assertLoadHook('file', config.GlobalConfig)
2051
 
 
2052
 
    def test_load_hook_locations(self):
2053
 
        self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
2054
 
 
2055
 
    def test_load_hook_branch(self):
2056
 
        self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
2057
 
 
2058
 
    def assertSaveHook(self, conf):
2059
 
        calls = []
2060
 
        def hook(*args):
2061
 
            calls.append(args)
2062
 
        config.OldConfigHooks.install_named_hook('save', hook, None)
2063
 
        self.addCleanup(
2064
 
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
2065
 
        self.assertLength(0, calls)
2066
 
        # Setting an option triggers a save
2067
 
        conf.set_user_option('foo', 'bar')
2068
 
        self.assertLength(1, calls)
2069
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2070
 
 
2071
 
    def test_save_hook_bazaar(self):
2072
 
        self.assertSaveHook(self.bazaar_config)
2073
 
 
2074
 
    def test_save_hook_locations(self):
2075
 
        self.assertSaveHook(self.locations_config)
2076
 
 
2077
 
    def test_save_hook_branch(self):
2078
 
        self.assertSaveHook(self.branch_config)
2079
 
 
2080
 
 
2081
 
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
2082
 
    """Tests config hooks for remote configs.
2083
 
 
2084
 
    No tests for the remove hook as this is not implemented there.
2085
 
    """
2086
 
 
2087
 
    def setUp(self):
2088
 
        super(TestOldConfigHooksForRemote, self).setUp()
2089
 
        self.transport_server = test_server.SmartTCPServer_for_testing
2090
 
        create_configs_with_file_option(self)
2091
 
 
2092
 
    def assertGetHook(self, conf, name, value):
2093
 
        calls = []
2094
 
        def hook(*args):
2095
 
            calls.append(args)
2096
 
        config.OldConfigHooks.install_named_hook('get', hook, None)
2097
 
        self.addCleanup(
2098
 
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
2099
 
        self.assertLength(0, calls)
2100
 
        actual_value = conf.get_option(name)
2101
 
        self.assertEquals(value, actual_value)
2102
 
        self.assertLength(1, calls)
2103
 
        self.assertEquals((conf, name, value), calls[0])
2104
 
 
2105
 
    def test_get_hook_remote_branch(self):
2106
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2107
 
        self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
2108
 
 
2109
 
    def test_get_hook_remote_bzrdir(self):
2110
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2111
 
        conf = remote_bzrdir._get_config()
2112
 
        conf.set_option('remotedir', 'file')
2113
 
        self.assertGetHook(conf, 'file', 'remotedir')
2114
 
 
2115
 
    def assertSetHook(self, conf, name, value):
2116
 
        calls = []
2117
 
        def hook(*args):
2118
 
            calls.append(args)
2119
 
        config.OldConfigHooks.install_named_hook('set', hook, None)
2120
 
        self.addCleanup(
2121
 
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
2122
 
        self.assertLength(0, calls)
2123
 
        conf.set_option(value, name)
2124
 
        self.assertLength(1, calls)
2125
 
        # We can't assert the conf object below as different configs use
2126
 
        # different means to implement set_user_option and we care only about
2127
 
        # coverage here.
2128
 
        self.assertEquals((name, value), calls[0][1:])
2129
 
 
2130
 
    def test_set_hook_remote_branch(self):
2131
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2132
 
        self.addCleanup(remote_branch.lock_write().unlock)
2133
 
        self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
2134
 
 
2135
 
    def test_set_hook_remote_bzrdir(self):
2136
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2137
 
        self.addCleanup(remote_branch.lock_write().unlock)
2138
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2139
 
        self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
2140
 
 
2141
 
    def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
2142
 
        calls = []
2143
 
        def hook(*args):
2144
 
            calls.append(args)
2145
 
        config.OldConfigHooks.install_named_hook('load', hook, None)
2146
 
        self.addCleanup(
2147
 
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
2148
 
        self.assertLength(0, calls)
2149
 
        # Build a config
2150
 
        conf = conf_class(*conf_args)
2151
 
        # Access an option to trigger a load
2152
 
        conf.get_option(name)
2153
 
        self.assertLength(expected_nb_calls, calls)
2154
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2155
 
 
2156
 
    def test_load_hook_remote_branch(self):
2157
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2158
 
        self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
2159
 
 
2160
 
    def test_load_hook_remote_bzrdir(self):
2161
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2162
 
        # The config file doesn't exist, set an option to force its creation
2163
 
        conf = remote_bzrdir._get_config()
2164
 
        conf.set_option('remotedir', 'file')
2165
 
        # We get one call for the server and one call for the client, this is
2166
 
        # caused by the differences in implementations betwen
2167
 
        # SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
2168
 
        # SmartServerBranchGetConfigFile (in smart/branch.py)
2169
 
        self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
2170
 
 
2171
 
    def assertSaveHook(self, conf):
2172
 
        calls = []
2173
 
        def hook(*args):
2174
 
            calls.append(args)
2175
 
        config.OldConfigHooks.install_named_hook('save', hook, None)
2176
 
        self.addCleanup(
2177
 
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
2178
 
        self.assertLength(0, calls)
2179
 
        # Setting an option triggers a save
2180
 
        conf.set_option('foo', 'bar')
2181
 
        self.assertLength(1, calls)
2182
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2183
 
 
2184
 
    def test_save_hook_remote_branch(self):
2185
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2186
 
        self.addCleanup(remote_branch.lock_write().unlock)
2187
 
        self.assertSaveHook(remote_branch._get_config())
2188
 
 
2189
 
    def test_save_hook_remote_bzrdir(self):
2190
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2191
 
        self.addCleanup(remote_branch.lock_write().unlock)
2192
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2193
 
        self.assertSaveHook(remote_bzrdir._get_config())
2194
 
 
2195
 
 
2196
 
class TestOption(tests.TestCase):
2197
 
 
2198
 
    def test_default_value(self):
2199
 
        opt = config.Option('foo', default='bar')
2200
 
        self.assertEquals('bar', opt.get_default())
2201
 
 
2202
 
 
2203
 
class TestOptionRegistry(tests.TestCase):
2204
 
 
2205
 
    def setUp(self):
2206
 
        super(TestOptionRegistry, self).setUp()
2207
 
        # Always start with an empty registry
2208
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2209
 
        self.registry = config.option_registry
2210
 
 
2211
 
    def test_register(self):
2212
 
        opt = config.Option('foo')
2213
 
        self.registry.register('foo', opt)
2214
 
        self.assertIs(opt, self.registry.get('foo'))
2215
 
 
2216
 
    lazy_option = config.Option('lazy_foo')
2217
 
 
2218
 
    def test_register_lazy(self):
2219
 
        self.registry.register_lazy('foo', self.__module__,
2220
 
                                    'TestOptionRegistry.lazy_option')
2221
 
        self.assertIs(self.lazy_option, self.registry.get('foo'))
2222
 
 
2223
 
    def test_registered_help(self):
2224
 
        opt = config.Option('foo')
2225
 
        self.registry.register('foo', opt, help='A simple option')
2226
 
        self.assertEquals('A simple option', self.registry.get_help('foo'))
2227
 
 
2228
 
 
2229
 
class TestRegisteredOptions(tests.TestCase):
2230
 
    """All registered options should verify some constraints."""
2231
 
 
2232
 
    scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2233
 
                 in config.option_registry.iteritems()]
2234
 
 
2235
 
    def setUp(self):
2236
 
        super(TestRegisteredOptions, self).setUp()
2237
 
        self.registry = config.option_registry
2238
 
 
2239
 
    def test_proper_name(self):
2240
 
        # An option should be registered under its own name, this can't be
2241
 
        # checked at registration time for the lazy ones.
2242
 
        self.assertEquals(self.option_name, self.option.name)
2243
 
 
2244
 
    def test_help_is_set(self):
2245
 
        option_help = self.registry.get_help(self.option_name)
2246
 
        self.assertNotEquals(None, option_help)
2247
 
        # Come on, think about the user, he really wants to know whst the
2248
 
        # option is about
2249
 
        self.assertNotEquals('', option_help)
2250
 
 
2251
 
 
2252
 
class TestSection(tests.TestCase):
2253
 
 
2254
 
    # FIXME: Parametrize so that all sections produced by Stores run these
2255
 
    # tests -- vila 2011-04-01
2256
 
 
2257
 
    def test_get_a_value(self):
2258
 
        a_dict = dict(foo='bar')
2259
 
        section = config.Section('myID', a_dict)
2260
 
        self.assertEquals('bar', section.get('foo'))
2261
 
 
2262
 
    def test_get_unknown_option(self):
2263
 
        a_dict = dict()
2264
 
        section = config.Section(None, a_dict)
2265
 
        self.assertEquals('out of thin air',
2266
 
                          section.get('foo', 'out of thin air'))
2267
 
 
2268
 
    def test_options_is_shared(self):
2269
 
        a_dict = dict()
2270
 
        section = config.Section(None, a_dict)
2271
 
        self.assertIs(a_dict, section.options)
2272
 
 
2273
 
 
2274
 
class TestMutableSection(tests.TestCase):
2275
 
 
2276
 
    # FIXME: Parametrize so that all sections (including os.environ and the
2277
 
    # ones produced by Stores) run these tests -- vila 2011-04-01
2278
 
 
2279
 
    def test_set(self):
2280
 
        a_dict = dict(foo='bar')
2281
 
        section = config.MutableSection('myID', a_dict)
2282
 
        section.set('foo', 'new_value')
2283
 
        self.assertEquals('new_value', section.get('foo'))
2284
 
        # The change appears in the shared section
2285
 
        self.assertEquals('new_value', a_dict.get('foo'))
2286
 
        # We keep track of the change
2287
 
        self.assertTrue('foo' in section.orig)
2288
 
        self.assertEquals('bar', section.orig.get('foo'))
2289
 
 
2290
 
    def test_set_preserve_original_once(self):
2291
 
        a_dict = dict(foo='bar')
2292
 
        section = config.MutableSection('myID', a_dict)
2293
 
        section.set('foo', 'first_value')
2294
 
        section.set('foo', 'second_value')
2295
 
        # We keep track of the original value
2296
 
        self.assertTrue('foo' in section.orig)
2297
 
        self.assertEquals('bar', section.orig.get('foo'))
2298
 
 
2299
 
    def test_remove(self):
2300
 
        a_dict = dict(foo='bar')
2301
 
        section = config.MutableSection('myID', a_dict)
2302
 
        section.remove('foo')
2303
 
        # We get None for unknown options via the default value
2304
 
        self.assertEquals(None, section.get('foo'))
2305
 
        # Or we just get the default value
2306
 
        self.assertEquals('unknown', section.get('foo', 'unknown'))
2307
 
        self.assertFalse('foo' in section.options)
2308
 
        # We keep track of the deletion
2309
 
        self.assertTrue('foo' in section.orig)
2310
 
        self.assertEquals('bar', section.orig.get('foo'))
2311
 
 
2312
 
    def test_remove_new_option(self):
2313
 
        a_dict = dict()
2314
 
        section = config.MutableSection('myID', a_dict)
2315
 
        section.set('foo', 'bar')
2316
 
        section.remove('foo')
2317
 
        self.assertFalse('foo' in section.options)
2318
 
        # The option didn't exist initially so it we need to keep track of it
2319
 
        # with a special value
2320
 
        self.assertTrue('foo' in section.orig)
2321
 
        self.assertEquals(config._NewlyCreatedOption, section.orig['foo'])
2322
 
 
2323
 
 
2324
 
class TestStore(tests.TestCaseWithTransport):
2325
 
 
2326
 
    def assertSectionContent(self, expected, section):
2327
 
        """Assert that some options have the proper values in a section."""
2328
 
        expected_name, expected_options = expected
2329
 
        self.assertEquals(expected_name, section.id)
2330
 
        self.assertEquals(
2331
 
            expected_options,
2332
 
            dict([(k, section.get(k)) for k in expected_options.keys()]))
2333
 
 
2334
 
 
2335
 
class TestReadonlyStore(TestStore):
2336
 
 
2337
 
    scenarios = [(key, {'get_store': builder}) for key, builder
2338
 
                 in config.test_store_builder_registry.iteritems()]
2339
 
 
2340
 
    def setUp(self):
2341
 
        super(TestReadonlyStore, self).setUp()
2342
 
 
2343
 
    def test_building_delays_load(self):
2344
 
        store = self.get_store(self)
2345
 
        self.assertEquals(False, store.is_loaded())
2346
 
        store._load_from_string('')
2347
 
        self.assertEquals(True, store.is_loaded())
2348
 
 
2349
 
    def test_get_no_sections_for_empty(self):
2350
 
        store = self.get_store(self)
2351
 
        store._load_from_string('')
2352
 
        self.assertEquals([], list(store.get_sections()))
2353
 
 
2354
 
    def test_get_default_section(self):
2355
 
        store = self.get_store(self)
2356
 
        store._load_from_string('foo=bar')
2357
 
        sections = list(store.get_sections())
2358
 
        self.assertLength(1, sections)
2359
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2360
 
 
2361
 
    def test_get_named_section(self):
2362
 
        store = self.get_store(self)
2363
 
        store._load_from_string('[baz]\nfoo=bar')
2364
 
        sections = list(store.get_sections())
2365
 
        self.assertLength(1, sections)
2366
 
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2367
 
 
2368
 
    def test_load_from_string_fails_for_non_empty_store(self):
2369
 
        store = self.get_store(self)
2370
 
        store._load_from_string('foo=bar')
2371
 
        self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2372
 
 
2373
 
 
2374
 
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2375
 
    """Simulate loading a config store without content of various encodings.
2376
 
 
2377
 
    All files produced by bzr are in utf8 content.
2378
 
 
2379
 
    Users may modify them manually and end up with a file that can't be
2380
 
    loaded. We need to issue proper error messages in this case.
2381
 
    """
2382
 
 
2383
 
    invalid_utf8_char = '\xff'
2384
 
 
2385
 
    def test_load_utf8(self):
2386
 
        """Ensure we can load an utf8-encoded file."""
2387
 
        t = self.get_transport()
2388
 
        # From http://pad.lv/799212
2389
 
        unicode_user = u'b\N{Euro Sign}ar'
2390
 
        unicode_content = u'user=%s' % (unicode_user,)
2391
 
        utf8_content = unicode_content.encode('utf8')
2392
 
        # Store the raw content in the config file
2393
 
        t.put_bytes('foo.conf', utf8_content)
2394
 
        store = config.IniFileStore(t, 'foo.conf')
2395
 
        store.load()
2396
 
        stack = config.Stack([store.get_sections], store)
2397
 
        self.assertEquals(unicode_user, stack.get('user'))
2398
 
 
2399
 
    def test_load_non_ascii(self):
2400
 
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
2401
 
        t = self.get_transport()
2402
 
        t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2403
 
        store = config.IniFileStore(t, 'foo.conf')
2404
 
        self.assertRaises(errors.ConfigContentError, store.load)
2405
 
 
2406
 
    def test_load_erroneous_content(self):
2407
 
        """Ensure we display a proper error on content that can't be parsed."""
2408
 
        t = self.get_transport()
2409
 
        t.put_bytes('foo.conf', '[open_section\n')
2410
 
        store = config.IniFileStore(t, 'foo.conf')
2411
 
        self.assertRaises(errors.ParseConfigError, store.load)
2412
 
 
2413
 
 
2414
 
class TestIniConfigContent(tests.TestCaseWithTransport):
2415
 
    """Simulate loading a IniBasedConfig without content of various encodings.
2416
 
 
2417
 
    All files produced by bzr are in utf8 content.
2418
 
 
2419
 
    Users may modify them manually and end up with a file that can't be
2420
 
    loaded. We need to issue proper error messages in this case.
2421
 
    """
2422
 
 
2423
 
    invalid_utf8_char = '\xff'
2424
 
 
2425
 
    def test_load_utf8(self):
2426
 
        """Ensure we can load an utf8-encoded file."""
2427
 
        # From http://pad.lv/799212
2428
 
        unicode_user = u'b\N{Euro Sign}ar'
2429
 
        unicode_content = u'user=%s' % (unicode_user,)
2430
 
        utf8_content = unicode_content.encode('utf8')
2431
 
        # Store the raw content in the config file
2432
 
        with open('foo.conf', 'wb') as f:
2433
 
            f.write(utf8_content)
2434
 
        conf = config.IniBasedConfig(file_name='foo.conf')
2435
 
        self.assertEquals(unicode_user, conf.get_user_option('user'))
2436
 
 
2437
 
    def test_load_badly_encoded_content(self):
2438
 
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
2439
 
        with open('foo.conf', 'wb') as f:
2440
 
            f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2441
 
        conf = config.IniBasedConfig(file_name='foo.conf')
2442
 
        self.assertRaises(errors.ConfigContentError, conf._get_parser)
2443
 
 
2444
 
    def test_load_erroneous_content(self):
2445
 
        """Ensure we display a proper error on content that can't be parsed."""
2446
 
        with open('foo.conf', 'wb') as f:
2447
 
            f.write('[open_section\n')
2448
 
        conf = config.IniBasedConfig(file_name='foo.conf')
2449
 
        self.assertRaises(errors.ParseConfigError, conf._get_parser)
2450
 
 
2451
 
 
2452
 
class TestMutableStore(TestStore):
2453
 
 
2454
 
    scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2455
 
                 in config.test_store_builder_registry.iteritems()]
2456
 
 
2457
 
    def setUp(self):
2458
 
        super(TestMutableStore, self).setUp()
2459
 
        self.transport = self.get_transport()
2460
 
 
2461
 
    def has_store(self, store):
2462
 
        store_basename = urlutils.relative_url(self.transport.external_url(),
2463
 
                                               store.external_url())
2464
 
        return self.transport.has(store_basename)
2465
 
 
2466
 
    def test_save_empty_creates_no_file(self):
2467
 
        # FIXME: There should be a better way than relying on the test
2468
 
        # parametrization to identify branch.conf -- vila 2011-0526
2469
 
        if self.store_id in ('branch', 'remote_branch'):
2470
 
            raise tests.TestNotApplicable(
2471
 
                'branch.conf is *always* created when a branch is initialized')
2472
 
        store = self.get_store(self)
2473
 
        store.save()
2474
 
        self.assertEquals(False, self.has_store(store))
2475
 
 
2476
 
    def test_save_emptied_succeeds(self):
2477
 
        store = self.get_store(self)
2478
 
        store._load_from_string('foo=bar\n')
2479
 
        section = store.get_mutable_section(None)
2480
 
        section.remove('foo')
2481
 
        store.save()
2482
 
        self.assertEquals(True, self.has_store(store))
2483
 
        modified_store = self.get_store(self)
2484
 
        sections = list(modified_store.get_sections())
2485
 
        self.assertLength(0, sections)
2486
 
 
2487
 
    def test_save_with_content_succeeds(self):
2488
 
        # FIXME: There should be a better way than relying on the test
2489
 
        # parametrization to identify branch.conf -- vila 2011-0526
2490
 
        if self.store_id in ('branch', 'remote_branch'):
2491
 
            raise tests.TestNotApplicable(
2492
 
                'branch.conf is *always* created when a branch is initialized')
2493
 
        store = self.get_store(self)
2494
 
        store._load_from_string('foo=bar\n')
2495
 
        self.assertEquals(False, self.has_store(store))
2496
 
        store.save()
2497
 
        self.assertEquals(True, self.has_store(store))
2498
 
        modified_store = self.get_store(self)
2499
 
        sections = list(modified_store.get_sections())
2500
 
        self.assertLength(1, sections)
2501
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2502
 
 
2503
 
    def test_set_option_in_empty_store(self):
2504
 
        store = self.get_store(self)
2505
 
        section = store.get_mutable_section(None)
2506
 
        section.set('foo', 'bar')
2507
 
        store.save()
2508
 
        modified_store = self.get_store(self)
2509
 
        sections = list(modified_store.get_sections())
2510
 
        self.assertLength(1, sections)
2511
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2512
 
 
2513
 
    def test_set_option_in_default_section(self):
2514
 
        store = self.get_store(self)
2515
 
        store._load_from_string('')
2516
 
        section = store.get_mutable_section(None)
2517
 
        section.set('foo', 'bar')
2518
 
        store.save()
2519
 
        modified_store = self.get_store(self)
2520
 
        sections = list(modified_store.get_sections())
2521
 
        self.assertLength(1, sections)
2522
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2523
 
 
2524
 
    def test_set_option_in_named_section(self):
2525
 
        store = self.get_store(self)
2526
 
        store._load_from_string('')
2527
 
        section = store.get_mutable_section('baz')
2528
 
        section.set('foo', 'bar')
2529
 
        store.save()
2530
 
        modified_store = self.get_store(self)
2531
 
        sections = list(modified_store.get_sections())
2532
 
        self.assertLength(1, sections)
2533
 
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2534
 
 
2535
 
    def test_load_hook(self):
2536
 
        # We first needs to ensure that the store exists
2537
 
        store = self.get_store(self)
2538
 
        section = store.get_mutable_section('baz')
2539
 
        section.set('foo', 'bar')
2540
 
        store.save()
2541
 
        # Now we can try to load it
2542
 
        store = self.get_store(self)
2543
 
        calls = []
2544
 
        def hook(*args):
2545
 
            calls.append(args)
2546
 
        config.ConfigHooks.install_named_hook('load', hook, None)
2547
 
        self.assertLength(0, calls)
2548
 
        store.load()
2549
 
        self.assertLength(1, calls)
2550
 
        self.assertEquals((store,), calls[0])
2551
 
 
2552
 
    def test_save_hook(self):
2553
 
        calls = []
2554
 
        def hook(*args):
2555
 
            calls.append(args)
2556
 
        config.ConfigHooks.install_named_hook('save', hook, None)
2557
 
        self.assertLength(0, calls)
2558
 
        store = self.get_store(self)
2559
 
        section = store.get_mutable_section('baz')
2560
 
        section.set('foo', 'bar')
2561
 
        store.save()
2562
 
        self.assertLength(1, calls)
2563
 
        self.assertEquals((store,), calls[0])
2564
 
 
2565
 
 
2566
 
class TestIniFileStore(TestStore):
2567
 
 
2568
 
    def test_loading_unknown_file_fails(self):
2569
 
        store = config.IniFileStore(self.get_transport(), 'I-do-not-exist')
2570
 
        self.assertRaises(errors.NoSuchFile, store.load)
2571
 
 
2572
 
    def test_invalid_content(self):
2573
 
        store = config.IniFileStore(self.get_transport(), 'foo.conf', )
2574
 
        self.assertEquals(False, store.is_loaded())
2575
 
        exc = self.assertRaises(
2576
 
            errors.ParseConfigError, store._load_from_string,
2577
 
            'this is invalid !')
2578
 
        self.assertEndsWith(exc.filename, 'foo.conf')
2579
 
        # And the load failed
2580
 
        self.assertEquals(False, store.is_loaded())
2581
 
 
2582
 
    def test_get_embedded_sections(self):
2583
 
        # A more complicated example (which also shows that section names and
2584
 
        # option names share the same name space...)
2585
 
        # FIXME: This should be fixed by forbidding dicts as values ?
2586
 
        # -- vila 2011-04-05
2587
 
        store = config.IniFileStore(self.get_transport(), 'foo.conf', )
2588
 
        store._load_from_string('''
2589
 
foo=bar
2590
 
l=1,2
2591
 
[DEFAULT]
2592
 
foo_in_DEFAULT=foo_DEFAULT
2593
 
[bar]
2594
 
foo_in_bar=barbar
2595
 
[baz]
2596
 
foo_in_baz=barbaz
2597
 
[[qux]]
2598
 
foo_in_qux=quux
2599
 
''')
2600
 
        sections = list(store.get_sections())
2601
 
        self.assertLength(4, sections)
2602
 
        # The default section has no name.
2603
 
        # List values are provided as lists
2604
 
        self.assertSectionContent((None, {'foo': 'bar', 'l': ['1', '2']}),
2605
 
                                  sections[0])
2606
 
        self.assertSectionContent(
2607
 
            ('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2608
 
        self.assertSectionContent(
2609
 
            ('bar', {'foo_in_bar': 'barbar'}), sections[2])
2610
 
        # sub sections are provided as embedded dicts.
2611
 
        self.assertSectionContent(
2612
 
            ('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2613
 
            sections[3])
2614
 
 
2615
 
 
2616
 
class TestLockableIniFileStore(TestStore):
2617
 
 
2618
 
    def test_create_store_in_created_dir(self):
2619
 
        self.assertPathDoesNotExist('dir')
2620
 
        t = self.get_transport('dir/subdir')
2621
 
        store = config.LockableIniFileStore(t, 'foo.conf')
2622
 
        store.get_mutable_section(None).set('foo', 'bar')
2623
 
        store.save()
2624
 
        self.assertPathExists('dir/subdir')
2625
 
 
2626
 
 
2627
 
class TestConcurrentStoreUpdates(TestStore):
2628
 
    """Test that Stores properly handle conccurent updates.
2629
 
 
2630
 
    New Store implementation may fail some of these tests but until such
2631
 
    implementations exist it's hard to properly filter them from the scenarios
2632
 
    applied here. If you encounter such a case, contact the bzr devs.
2633
 
    """
2634
 
 
2635
 
    scenarios = [(key, {'get_stack': builder}) for key, builder
2636
 
                 in config.test_stack_builder_registry.iteritems()]
2637
 
 
2638
 
    def setUp(self):
2639
 
        super(TestConcurrentStoreUpdates, self).setUp()
2640
 
        self._content = 'one=1\ntwo=2\n'
2641
 
        self.stack = self.get_stack(self)
2642
 
        if not isinstance(self.stack, config._CompatibleStack):
2643
 
            raise tests.TestNotApplicable(
2644
 
                '%s is not meant to be compatible with the old config design'
2645
 
                % (self.stack,))
2646
 
        self.stack.store._load_from_string(self._content)
2647
 
        # Flush the store
2648
 
        self.stack.store.save()
2649
 
 
2650
 
    def test_simple_read_access(self):
2651
 
        self.assertEquals('1', self.stack.get('one'))
2652
 
 
2653
 
    def test_simple_write_access(self):
2654
 
        self.stack.set('one', 'one')
2655
 
        self.assertEquals('one', self.stack.get('one'))
2656
 
 
2657
 
    def test_listen_to_the_last_speaker(self):
2658
 
        c1 = self.stack
2659
 
        c2 = self.get_stack(self)
2660
 
        c1.set('one', 'ONE')
2661
 
        c2.set('two', 'TWO')
2662
 
        self.assertEquals('ONE', c1.get('one'))
2663
 
        self.assertEquals('TWO', c2.get('two'))
2664
 
        # The second update respect the first one
2665
 
        self.assertEquals('ONE', c2.get('one'))
2666
 
 
2667
 
    def test_last_speaker_wins(self):
2668
 
        # If the same config is not shared, the same variable modified twice
2669
 
        # can only see a single result.
2670
 
        c1 = self.stack
2671
 
        c2 = self.get_stack(self)
2672
 
        c1.set('one', 'c1')
2673
 
        c2.set('one', 'c2')
2674
 
        self.assertEquals('c2', c2.get('one'))
2675
 
        # The first modification is still available until another refresh
2676
 
        # occur
2677
 
        self.assertEquals('c1', c1.get('one'))
2678
 
        c1.set('two', 'done')
2679
 
        self.assertEquals('c2', c1.get('one'))
2680
 
 
2681
 
    def test_writes_are_serialized(self):
2682
 
        c1 = self.stack
2683
 
        c2 = self.get_stack(self)
2684
 
 
2685
 
        # We spawn a thread that will pause *during* the config saving.
2686
 
        before_writing = threading.Event()
2687
 
        after_writing = threading.Event()
2688
 
        writing_done = threading.Event()
2689
 
        c1_save_without_locking_orig = c1.store.save_without_locking
2690
 
        def c1_save_without_locking():
2691
 
            before_writing.set()
2692
 
            c1_save_without_locking_orig()
2693
 
            # The lock is held. We wait for the main thread to decide when to
2694
 
            # continue
2695
 
            after_writing.wait()
2696
 
        c1.store.save_without_locking = c1_save_without_locking
2697
 
        def c1_set():
2698
 
            c1.set('one', 'c1')
2699
 
            writing_done.set()
2700
 
        t1 = threading.Thread(target=c1_set)
2701
 
        # Collect the thread after the test
2702
 
        self.addCleanup(t1.join)
2703
 
        # Be ready to unblock the thread if the test goes wrong
2704
 
        self.addCleanup(after_writing.set)
2705
 
        t1.start()
2706
 
        before_writing.wait()
2707
 
        self.assertRaises(errors.LockContention,
2708
 
                          c2.set, 'one', 'c2')
2709
 
        self.assertEquals('c1', c1.get('one'))
2710
 
        # Let the lock be released
2711
 
        after_writing.set()
2712
 
        writing_done.wait()
2713
 
        c2.set('one', 'c2')
2714
 
        self.assertEquals('c2', c2.get('one'))
2715
 
 
2716
 
    def test_read_while_writing(self):
2717
 
       c1 = self.stack
2718
 
       # We spawn a thread that will pause *during* the write
2719
 
       ready_to_write = threading.Event()
2720
 
       do_writing = threading.Event()
2721
 
       writing_done = threading.Event()
2722
 
       # We override the _save implementation so we know the store is locked
2723
 
       c1_save_without_locking_orig = c1.store.save_without_locking
2724
 
       def c1_save_without_locking():
2725
 
           ready_to_write.set()
2726
 
           # The lock is held. We wait for the main thread to decide when to
2727
 
           # continue
2728
 
           do_writing.wait()
2729
 
           c1_save_without_locking_orig()
2730
 
           writing_done.set()
2731
 
       c1.store.save_without_locking = c1_save_without_locking
2732
 
       def c1_set():
2733
 
           c1.set('one', 'c1')
2734
 
       t1 = threading.Thread(target=c1_set)
2735
 
       # Collect the thread after the test
2736
 
       self.addCleanup(t1.join)
2737
 
       # Be ready to unblock the thread if the test goes wrong
2738
 
       self.addCleanup(do_writing.set)
2739
 
       t1.start()
2740
 
       # Ensure the thread is ready to write
2741
 
       ready_to_write.wait()
2742
 
       self.assertEquals('c1', c1.get('one'))
2743
 
       # If we read during the write, we get the old value
2744
 
       c2 = self.get_stack(self)
2745
 
       self.assertEquals('1', c2.get('one'))
2746
 
       # Let the writing occur and ensure it occurred
2747
 
       do_writing.set()
2748
 
       writing_done.wait()
2749
 
       # Now we get the updated value
2750
 
       c3 = self.get_stack(self)
2751
 
       self.assertEquals('c1', c3.get('one'))
2752
 
 
2753
 
    # FIXME: It may be worth looking into removing the lock dir when it's not
2754
 
    # needed anymore and look at possible fallouts for concurrent lockers. This
2755
 
    # will matter if/when we use config files outside of bazaar directories
2756
 
    # (.bazaar or .bzr) -- vila 20110-04-11
2757
 
 
2758
 
 
2759
 
class TestSectionMatcher(TestStore):
2760
 
 
2761
 
    scenarios = [('location', {'matcher': config.LocationMatcher})]
2762
 
 
2763
 
    def get_store(self, file_name):
2764
 
        return config.IniFileStore(self.get_readonly_transport(), file_name)
2765
 
 
2766
 
    def test_no_matches_for_empty_stores(self):
2767
 
        store = self.get_store('foo.conf')
2768
 
        store._load_from_string('')
2769
 
        matcher = self.matcher(store, '/bar')
2770
 
        self.assertEquals([], list(matcher.get_sections()))
2771
 
 
2772
 
    def test_build_doesnt_load_store(self):
2773
 
        store = self.get_store('foo.conf')
2774
 
        matcher = self.matcher(store, '/bar')
2775
 
        self.assertFalse(store.is_loaded())
2776
 
 
2777
 
 
2778
 
class TestLocationSection(tests.TestCase):
2779
 
 
2780
 
    def get_section(self, options, extra_path):
2781
 
        section = config.Section('foo', options)
2782
 
        # We don't care about the length so we use '0'
2783
 
        return config.LocationSection(section, 0, extra_path)
2784
 
 
2785
 
    def test_simple_option(self):
2786
 
        section = self.get_section({'foo': 'bar'}, '')
2787
 
        self.assertEquals('bar', section.get('foo'))
2788
 
 
2789
 
    def test_option_with_extra_path(self):
2790
 
        section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
2791
 
                                   'baz')
2792
 
        self.assertEquals('bar/baz', section.get('foo'))
2793
 
 
2794
 
    def test_invalid_policy(self):
2795
 
        section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
2796
 
                                   'baz')
2797
 
        # invalid policies are ignored
2798
 
        self.assertEquals('bar', section.get('foo'))
2799
 
 
2800
 
 
2801
 
class TestLocationMatcher(TestStore):
2802
 
 
2803
 
    def get_store(self, file_name):
2804
 
        return config.IniFileStore(self.get_readonly_transport(), file_name)
2805
 
 
2806
 
    def test_more_specific_sections_first(self):
2807
 
        store = self.get_store('foo.conf')
2808
 
        store._load_from_string('''
2809
 
[/foo]
2810
 
section=/foo
2811
 
[/foo/bar]
2812
 
section=/foo/bar
2813
 
''')
2814
 
        self.assertEquals(['/foo', '/foo/bar'],
2815
 
                          [section.id for section in store.get_sections()])
2816
 
        matcher = config.LocationMatcher(store, '/foo/bar/baz')
2817
 
        sections = list(matcher.get_sections())
2818
 
        self.assertEquals([3, 2],
2819
 
                          [section.length for section in sections])
2820
 
        self.assertEquals(['/foo/bar', '/foo'],
2821
 
                          [section.id for section in sections])
2822
 
        self.assertEquals(['baz', 'bar/baz'],
2823
 
                          [section.extra_path for section in sections])
2824
 
 
2825
 
    def test_appendpath_in_no_name_section(self):
2826
 
        # It's a bit weird to allow appendpath in a no-name section, but
2827
 
        # someone may found a use for it
2828
 
        store = self.get_store('foo.conf')
2829
 
        store._load_from_string('''
2830
 
foo=bar
2831
 
foo:policy = appendpath
2832
 
''')
2833
 
        matcher = config.LocationMatcher(store, 'dir/subdir')
2834
 
        sections = list(matcher.get_sections())
2835
 
        self.assertLength(1, sections)
2836
 
        self.assertEquals('bar/dir/subdir', sections[0].get('foo'))
2837
 
 
2838
 
    def test_file_urls_are_normalized(self):
2839
 
        store = self.get_store('foo.conf')
2840
 
        if sys.platform == 'win32':
2841
 
            expected_url = 'file:///C:/dir/subdir'
2842
 
            expected_location = 'C:/dir/subdir'
2843
 
        else:
2844
 
            expected_url = 'file:///dir/subdir'
2845
 
            expected_location = '/dir/subdir'
2846
 
        matcher = config.LocationMatcher(store, expected_url)
2847
 
        self.assertEquals(expected_location, matcher.location)
2848
 
 
2849
 
 
2850
 
class TestStackGet(tests.TestCase):
2851
 
 
2852
 
    # FIXME: This should be parametrized for all known Stack or dedicated
2853
 
    # paramerized tests created to avoid bloating -- vila 2011-03-31
2854
 
 
2855
 
    def test_single_config_get(self):
2856
 
        conf = dict(foo='bar')
2857
 
        conf_stack = config.Stack([conf])
2858
 
        self.assertEquals('bar', conf_stack.get('foo'))
2859
 
 
2860
 
    def test_get_with_registered_default_value(self):
2861
 
        conf_stack = config.Stack([dict()])
2862
 
        opt = config.Option('foo', default='bar')
2863
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2864
 
        config.option_registry.register('foo', opt)
2865
 
        self.assertEquals('bar', conf_stack.get('foo'))
2866
 
 
2867
 
    def test_get_without_registered_default_value(self):
2868
 
        conf_stack = config.Stack([dict()])
2869
 
        opt = config.Option('foo')
2870
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2871
 
        config.option_registry.register('foo', opt)
2872
 
        self.assertEquals(None, conf_stack.get('foo'))
2873
 
 
2874
 
    def test_get_without_default_value_for_not_registered(self):
2875
 
        conf_stack = config.Stack([dict()])
2876
 
        opt = config.Option('foo')
2877
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2878
 
        self.assertEquals(None, conf_stack.get('foo'))
2879
 
 
2880
 
    def test_get_first_definition(self):
2881
 
        conf1 = dict(foo='bar')
2882
 
        conf2 = dict(foo='baz')
2883
 
        conf_stack = config.Stack([conf1, conf2])
2884
 
        self.assertEquals('bar', conf_stack.get('foo'))
2885
 
 
2886
 
    def test_get_embedded_definition(self):
2887
 
        conf1 = dict(yy='12')
2888
 
        conf2 = config.Stack([dict(xx='42'), dict(foo='baz')])
2889
 
        conf_stack = config.Stack([conf1, conf2])
2890
 
        self.assertEquals('baz', conf_stack.get('foo'))
2891
 
 
2892
 
    def test_get_for_empty_section_callable(self):
2893
 
        conf_stack = config.Stack([lambda : []])
2894
 
        self.assertEquals(None, conf_stack.get('foo'))
2895
 
 
2896
 
    def test_get_for_broken_callable(self):
2897
 
        # Trying to use and invalid callable raises an exception on first use
2898
 
        conf_stack = config.Stack([lambda : object()])
2899
 
        self.assertRaises(TypeError, conf_stack.get, 'foo')
2900
 
 
2901
 
 
2902
 
class TestStackWithTransport(tests.TestCaseWithTransport):
2903
 
 
2904
 
    scenarios = [(key, {'get_stack': builder}) for key, builder
2905
 
                 in config.test_stack_builder_registry.iteritems()]
2906
 
 
2907
 
 
2908
 
class TestConcreteStacks(TestStackWithTransport):
2909
 
 
2910
 
    def test_build_stack(self):
2911
 
        # Just a smoke test to help debug builders
2912
 
        stack = self.get_stack(self)
2913
 
 
2914
 
 
2915
 
class TestStackGet(TestStackWithTransport):
2916
 
 
2917
 
    def test_get_for_empty_stack(self):
2918
 
        conf = self.get_stack(self)
2919
 
        self.assertEquals(None, conf.get('foo'))
2920
 
 
2921
 
    def test_get_hook(self):
2922
 
        conf = self.get_stack(self)
2923
 
        conf.store._load_from_string('foo=bar')
2924
 
        calls = []
2925
 
        def hook(*args):
2926
 
            calls.append(args)
2927
 
        config.ConfigHooks.install_named_hook('get', hook, None)
2928
 
        self.assertLength(0, calls)
2929
 
        value = conf.get('foo')
2930
 
        self.assertEquals('bar', value)
2931
 
        self.assertLength(1, calls)
2932
 
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
2933
 
 
2934
 
 
2935
 
class TestStackSet(TestStackWithTransport):
2936
 
 
2937
 
    def test_simple_set(self):
2938
 
        conf = self.get_stack(self)
2939
 
        conf.store._load_from_string('foo=bar')
2940
 
        self.assertEquals('bar', conf.get('foo'))
2941
 
        conf.set('foo', 'baz')
2942
 
        # Did we get it back ?
2943
 
        self.assertEquals('baz', conf.get('foo'))
2944
 
 
2945
 
    def test_set_creates_a_new_section(self):
2946
 
        conf = self.get_stack(self)
2947
 
        conf.set('foo', 'baz')
2948
 
        self.assertEquals, 'baz', conf.get('foo')
2949
 
 
2950
 
    def test_set_hook(self):
2951
 
        calls = []
2952
 
        def hook(*args):
2953
 
            calls.append(args)
2954
 
        config.ConfigHooks.install_named_hook('set', hook, None)
2955
 
        self.assertLength(0, calls)
2956
 
        conf = self.get_stack(self)
2957
 
        conf.set('foo', 'bar')
2958
 
        self.assertLength(1, calls)
2959
 
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
2960
 
 
2961
 
 
2962
 
class TestStackRemove(TestStackWithTransport):
2963
 
 
2964
 
    def test_remove_existing(self):
2965
 
        conf = self.get_stack(self)
2966
 
        conf.store._load_from_string('foo=bar')
2967
 
        self.assertEquals('bar', conf.get('foo'))
2968
 
        conf.remove('foo')
2969
 
        # Did we get it back ?
2970
 
        self.assertEquals(None, conf.get('foo'))
2971
 
 
2972
 
    def test_remove_unknown(self):
2973
 
        conf = self.get_stack(self)
2974
 
        self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
2975
 
 
2976
 
    def test_remove_hook(self):
2977
 
        calls = []
2978
 
        def hook(*args):
2979
 
            calls.append(args)
2980
 
        config.ConfigHooks.install_named_hook('remove', hook, None)
2981
 
        self.assertLength(0, calls)
2982
 
        conf = self.get_stack(self)
2983
 
        conf.store._load_from_string('foo=bar')
2984
 
        conf.remove('foo')
2985
 
        self.assertLength(1, calls)
2986
 
        self.assertEquals((conf, 'foo'), calls[0])
2987
 
 
2988
 
 
2989
1817
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
2990
1818
 
2991
1819
    def setUp(self):
2992
1820
        super(TestConfigGetOptions, self).setUp()
2993
1821
        create_configs(self)
2994
1822
 
 
1823
    # One variable in none of the above
2995
1824
    def test_no_variable(self):
2996
1825
        # Using branch should query branch, locations and bazaar
2997
1826
        self.assertOptions([], self.branch_config)
3157
1986
        self.assertEquals({}, conf._get_config())
3158
1987
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
3159
1988
 
3160
 
    def test_non_utf8_config(self):
3161
 
        conf = config.AuthenticationConfig(_file=StringIO(
3162
 
                'foo = bar\xff'))
3163
 
        self.assertRaises(errors.ConfigContentError, conf._get_config)
3164
 
        
3165
1989
    def test_missing_auth_section_header(self):
3166
1990
        conf = config.AuthenticationConfig(_file=StringIO('foo = bar'))
3167
1991
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
3425
2249
 
3426
2250
    def test_username_defaults_prompts(self):
3427
2251
        # HTTP prompts can't be tested here, see test_http.py
3428
 
        self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
3429
 
        self._check_default_username_prompt(
3430
 
            u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
3431
 
        self._check_default_username_prompt(
3432
 
            u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
 
2252
        self._check_default_username_prompt('FTP %(host)s username: ', 'ftp')
 
2253
        self._check_default_username_prompt(
 
2254
            'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
 
2255
        self._check_default_username_prompt(
 
2256
            'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
3433
2257
 
3434
2258
    def test_username_default_no_prompt(self):
3435
2259
        conf = config.AuthenticationConfig()
3441
2265
    def test_password_default_prompts(self):
3442
2266
        # HTTP prompts can't be tested here, see test_http.py
3443
2267
        self._check_default_password_prompt(
3444
 
            u'FTP %(user)s@%(host)s password: ', 'ftp')
3445
 
        self._check_default_password_prompt(
3446
 
            u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
3447
 
        self._check_default_password_prompt(
3448
 
            u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
 
2268
            'FTP %(user)s@%(host)s password: ', 'ftp')
 
2269
        self._check_default_password_prompt(
 
2270
            'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
 
2271
        self._check_default_password_prompt(
 
2272
            'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
3449
2273
        # SMTP port handling is a bit special (it's handled if embedded in the
3450
2274
        # host too)
3451
2275
        # FIXME: should we: forbid that, extend it to other schemes, leave
3452
2276
        # things as they are that's fine thank you ?
3453
 
        self._check_default_password_prompt(
3454
 
            u'SMTP %(user)s@%(host)s password: ', 'smtp')
3455
 
        self._check_default_password_prompt(
3456
 
            u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
3457
 
        self._check_default_password_prompt(
3458
 
            u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
 
2277
        self._check_default_password_prompt('SMTP %(user)s@%(host)s password: ',
 
2278
                                            'smtp')
 
2279
        self._check_default_password_prompt('SMTP %(user)s@%(host)s password: ',
 
2280
                                            'smtp', host='bar.org:10025')
 
2281
        self._check_default_password_prompt(
 
2282
            'SMTP %(user)s@%(host)s:%(port)d password: ',
 
2283
            'smtp', port=10025)
3459
2284
 
3460
2285
    def test_ssh_password_emits_warning(self):
3461
2286
        conf = config.AuthenticationConfig(_file=StringIO(
3655
2480
        to be able to choose a user name with no configuration.
3656
2481
        """
3657
2482
        if sys.platform == 'win32':
3658
 
            raise tests.TestSkipped(
3659
 
                "User name inference not implemented on win32")
 
2483
            raise TestSkipped("User name inference not implemented on win32")
3660
2484
        realname, address = config._auto_user_id()
3661
2485
        if os.path.exists('/etc/mailname'):
3662
2486
            self.assertIsNot(None, realname)