715
class TestBranchConfigItems(TestCaseInTempDir):
717
def get_branch_config(self, global_config=None, location=None,
1794
class TestBranchConfigItems(tests.TestCaseInTempDir):
1796
def get_branch_config(self, global_config=None, location=None,
718
1797
location_config=None, branch_data_config=None):
719
my_config = config.BranchConfig(FakeBranch(location))
1798
my_branch = FakeBranch(location)
720
1799
if global_config is not None:
721
global_file = StringIO(global_config.encode('utf-8'))
722
my_config._get_global_config()._get_parser(global_file)
723
self.my_location_config = my_config._get_location_config()
1800
my_global_config = config.GlobalConfig.from_string(global_config,
724
1802
if location_config is not None:
725
location_file = StringIO(location_config.encode('utf-8'))
726
self.my_location_config._get_parser(location_file)
1803
my_location_config = config.LocationConfig.from_string(
1804
location_config, my_branch.base, save=True)
1805
my_config = config.BranchConfig(my_branch)
727
1806
if branch_data_config is not None:
728
1807
my_config.branch.control_files.files['branch.conf'] = \
729
1808
branch_data_config
730
1809
return my_config
732
1811
def test_user_id(self):
733
branch = FakeBranch(user_id='Robert Collins <robertc@example.net>')
1812
branch = FakeBranch()
734
1813
my_config = config.BranchConfig(branch)
735
self.assertEqual("Robert Collins <robertc@example.net>",
736
my_config.username())
737
branch.control_files.email = "John"
738
my_config.set_user_option('email',
1814
self.assertIsNot(None, my_config.username())
1815
my_config.branch.control_files.files['email'] = "John"
1816
my_config.set_user_option('email',
739
1817
"Robert Collins <robertc@example.org>")
740
self.assertEqual("John", my_config.username())
741
branch.control_files.email = None
742
1818
self.assertEqual("Robert Collins <robertc@example.org>",
743
my_config.username())
745
def test_not_set_in_branch(self):
746
my_config = self.get_branch_config(sample_config_text)
747
my_config.branch.control_files.email = None
748
self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
749
my_config._get_user_id())
750
my_config.branch.control_files.email = "John"
751
self.assertEqual("John", my_config._get_user_id())
1819
my_config.username())
753
1821
def test_BZR_EMAIL_OVERRIDES(self):
754
os.environ['BZR_EMAIL'] = "Robert Collins <robertc@example.org>"
1822
self.overrideEnv('BZR_EMAIL', "Robert Collins <robertc@example.org>")
755
1823
branch = FakeBranch()
756
1824
my_config = config.BranchConfig(branch)
757
1825
self.assertEqual("Robert Collins <robertc@example.org>",
758
1826
my_config.username())
760
1828
def test_signatures_forced(self):
761
1829
my_config = self.get_branch_config(
762
1830
global_config=sample_always_signatures)
763
self.assertEqual(config.CHECK_NEVER, my_config.signature_checking())
764
self.assertEqual(config.SIGN_ALWAYS, my_config.signing_policy())
765
self.assertTrue(my_config.signature_needed())
1831
self.assertEqual(config.CHECK_NEVER,
1832
self.applyDeprecated(deprecated_in((2, 5, 0)),
1833
my_config.signature_checking))
1834
self.assertEqual(config.SIGN_ALWAYS,
1835
self.applyDeprecated(deprecated_in((2, 5, 0)),
1836
my_config.signing_policy))
1837
self.assertTrue(self.applyDeprecated(deprecated_in((2, 5, 0)),
1838
my_config.signature_needed))
767
1840
def test_signatures_forced_branch(self):
768
1841
my_config = self.get_branch_config(
769
1842
global_config=sample_ignore_signatures,
770
1843
branch_data_config=sample_always_signatures)
771
self.assertEqual(config.CHECK_NEVER, my_config.signature_checking())
772
self.assertEqual(config.SIGN_ALWAYS, my_config.signing_policy())
773
self.assertTrue(my_config.signature_needed())
1844
self.assertEqual(config.CHECK_NEVER,
1845
self.applyDeprecated(deprecated_in((2, 5, 0)),
1846
my_config.signature_checking))
1847
self.assertEqual(config.SIGN_ALWAYS,
1848
self.applyDeprecated(deprecated_in((2, 5, 0)),
1849
my_config.signing_policy))
1850
self.assertTrue(self.applyDeprecated(deprecated_in((2, 5, 0)),
1851
my_config.signature_needed))
775
1853
def test_gpg_signing_command(self):
776
1854
my_config = self.get_branch_config(
1855
global_config=sample_config_text,
777
1856
# branch data cannot set gpg_signing_command
778
1857
branch_data_config="gpg_signing_command=pgp")
779
config_file = StringIO(sample_config_text.encode('utf-8'))
780
my_config._get_global_config()._get_parser(config_file)
781
self.assertEqual('gnome-gpg', my_config.gpg_signing_command())
1858
self.assertEqual('gnome-gpg',
1859
self.applyDeprecated(deprecated_in((2, 5, 0)),
1860
my_config.gpg_signing_command))
783
1862
def test_get_user_option_global(self):
784
branch = FakeBranch()
785
my_config = config.BranchConfig(branch)
786
config_file = StringIO(sample_config_text.encode('utf-8'))
787
(my_config._get_global_config()._get_parser(config_file))
1863
my_config = self.get_branch_config(global_config=sample_config_text)
788
1864
self.assertEqual('something',
789
1865
my_config.get_user_option('user_global_option'))
791
1867
def test_post_commit_default(self):
792
branch = FakeBranch()
793
my_config = self.get_branch_config(sample_config_text, '/a/c',
794
sample_branches_text)
1868
my_config = self.get_branch_config(global_config=sample_config_text,
1870
location_config=sample_branches_text)
795
1871
self.assertEqual(my_config.branch.base, '/a/c')
796
1872
self.assertEqual('bzrlib.tests.test_config.post_commit',
797
my_config.post_commit())
1873
self.applyDeprecated(deprecated_in((2, 5, 0)),
1874
my_config.post_commit))
798
1875
my_config.set_user_option('post_commit', 'rmtree_root')
799
# post-commit is ignored when bresent in branch data
1876
# post-commit is ignored when present in branch data
800
1877
self.assertEqual('bzrlib.tests.test_config.post_commit',
801
my_config.post_commit())
802
my_config.set_user_option('post_commit', 'rmtree_root', local=True)
803
self.assertEqual('rmtree_root', my_config.post_commit())
1878
self.applyDeprecated(deprecated_in((2, 5, 0)),
1879
my_config.post_commit))
1880
my_config.set_user_option('post_commit', 'rmtree_root',
1881
store=config.STORE_LOCATION)
1882
self.assertEqual('rmtree_root',
1883
self.applyDeprecated(deprecated_in((2, 5, 0)),
1884
my_config.post_commit))
805
1886
def test_config_precedence(self):
1887
# FIXME: eager test, luckily no persitent config file makes it fail
806
1889
my_config = self.get_branch_config(global_config=precedence_global)
807
1890
self.assertEqual(my_config.get_user_option('option'), 'global')
808
my_config = self.get_branch_config(global_config=precedence_global,
809
branch_data_config=precedence_branch)
1891
my_config = self.get_branch_config(global_config=precedence_global,
1892
branch_data_config=precedence_branch)
810
1893
self.assertEqual(my_config.get_user_option('option'), 'branch')
811
my_config = self.get_branch_config(global_config=precedence_global,
812
branch_data_config=precedence_branch,
813
location_config=precedence_location)
1894
my_config = self.get_branch_config(
1895
global_config=precedence_global,
1896
branch_data_config=precedence_branch,
1897
location_config=precedence_location)
814
1898
self.assertEqual(my_config.get_user_option('option'), 'recurse')
815
my_config = self.get_branch_config(global_config=precedence_global,
816
branch_data_config=precedence_branch,
817
location_config=precedence_location,
818
location='http://example.com/specific')
1899
my_config = self.get_branch_config(
1900
global_config=precedence_global,
1901
branch_data_config=precedence_branch,
1902
location_config=precedence_location,
1903
location='http://example.com/specific')
819
1904
self.assertEqual(my_config.get_user_option('option'), 'exact')
822
class TestMailAddressExtraction(TestCase):
1906
def test_get_mail_client(self):
1907
config = self.get_branch_config()
1908
client = config.get_mail_client()
1909
self.assertIsInstance(client, mail_client.DefaultMail)
1912
config.set_user_option('mail_client', 'evolution')
1913
client = config.get_mail_client()
1914
self.assertIsInstance(client, mail_client.Evolution)
1916
config.set_user_option('mail_client', 'kmail')
1917
client = config.get_mail_client()
1918
self.assertIsInstance(client, mail_client.KMail)
1920
config.set_user_option('mail_client', 'mutt')
1921
client = config.get_mail_client()
1922
self.assertIsInstance(client, mail_client.Mutt)
1924
config.set_user_option('mail_client', 'thunderbird')
1925
client = config.get_mail_client()
1926
self.assertIsInstance(client, mail_client.Thunderbird)
1929
config.set_user_option('mail_client', 'default')
1930
client = config.get_mail_client()
1931
self.assertIsInstance(client, mail_client.DefaultMail)
1933
config.set_user_option('mail_client', 'editor')
1934
client = config.get_mail_client()
1935
self.assertIsInstance(client, mail_client.Editor)
1937
config.set_user_option('mail_client', 'mapi')
1938
client = config.get_mail_client()
1939
self.assertIsInstance(client, mail_client.MAPIClient)
1941
config.set_user_option('mail_client', 'xdg-email')
1942
client = config.get_mail_client()
1943
self.assertIsInstance(client, mail_client.XDGEmail)
1945
config.set_user_option('mail_client', 'firebird')
1946
self.assertRaises(errors.UnknownMailClient, config.get_mail_client)
1949
class TestMailAddressExtraction(tests.TestCase):
824
1951
def test_extract_email_address(self):
825
1952
self.assertEqual('jane@test.com',
826
1953
config.extract_email_address('Jane <jane@test.com>'))
827
self.assertRaises(errors.BzrError,
1954
self.assertRaises(errors.NoEmailInUsername,
828
1955
config.extract_email_address, 'Jane Tester')
1957
def test_parse_username(self):
1958
self.assertEqual(('', 'jdoe@example.com'),
1959
config.parse_username('jdoe@example.com'))
1960
self.assertEqual(('', 'jdoe@example.com'),
1961
config.parse_username('<jdoe@example.com>'))
1962
self.assertEqual(('John Doe', 'jdoe@example.com'),
1963
config.parse_username('John Doe <jdoe@example.com>'))
1964
self.assertEqual(('John Doe', ''),
1965
config.parse_username('John Doe'))
1966
self.assertEqual(('John Doe', 'jdoe@example.com'),
1967
config.parse_username('John Doe jdoe@example.com'))
1969
class TestTreeConfig(tests.TestCaseWithTransport):
1971
def test_get_value(self):
1972
"""Test that retreiving a value from a section is possible"""
1973
branch = self.make_branch('.')
1974
tree_config = config.TreeConfig(branch)
1975
tree_config.set_option('value', 'key', 'SECTION')
1976
tree_config.set_option('value2', 'key2')
1977
tree_config.set_option('value3-top', 'key3')
1978
tree_config.set_option('value3-section', 'key3', 'SECTION')
1979
value = tree_config.get_option('key', 'SECTION')
1980
self.assertEqual(value, 'value')
1981
value = tree_config.get_option('key2')
1982
self.assertEqual(value, 'value2')
1983
self.assertEqual(tree_config.get_option('non-existant'), None)
1984
value = tree_config.get_option('non-existant', 'SECTION')
1985
self.assertEqual(value, None)
1986
value = tree_config.get_option('non-existant', default='default')
1987
self.assertEqual(value, 'default')
1988
self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
1989
value = tree_config.get_option('key2', 'NOSECTION', default='default')
1990
self.assertEqual(value, 'default')
1991
value = tree_config.get_option('key3')
1992
self.assertEqual(value, 'value3-top')
1993
value = tree_config.get_option('key3', 'SECTION')
1994
self.assertEqual(value, 'value3-section')
1997
class TestTransportConfig(tests.TestCaseWithTransport):
1999
def test_load_utf8(self):
2000
"""Ensure we can load an utf8-encoded file."""
2001
t = self.get_transport()
2002
unicode_user = u'b\N{Euro Sign}ar'
2003
unicode_content = u'user=%s' % (unicode_user,)
2004
utf8_content = unicode_content.encode('utf8')
2005
# Store the raw content in the config file
2006
t.put_bytes('foo.conf', utf8_content)
2007
conf = config.TransportConfig(t, 'foo.conf')
2008
self.assertEquals(unicode_user, conf.get_option('user'))
2010
def test_load_non_ascii(self):
2011
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2012
t = self.get_transport()
2013
t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
2014
conf = config.TransportConfig(t, 'foo.conf')
2015
self.assertRaises(errors.ConfigContentError, conf._get_configobj)
2017
def test_load_erroneous_content(self):
2018
"""Ensure we display a proper error on content that can't be parsed."""
2019
t = self.get_transport()
2020
t.put_bytes('foo.conf', '[open_section\n')
2021
conf = config.TransportConfig(t, 'foo.conf')
2022
self.assertRaises(errors.ParseConfigError, conf._get_configobj)
2024
def test_load_permission_denied(self):
2025
"""Ensure we get an empty config file if the file is inaccessible."""
2028
warnings.append(args[0] % args[1:])
2029
self.overrideAttr(trace, 'warning', warning)
2031
class DenyingTransport(object):
2033
def __init__(self, base):
2036
def get_bytes(self, relpath):
2037
raise errors.PermissionDenied(relpath, "")
2039
cfg = config.TransportConfig(
2040
DenyingTransport("nonexisting://"), 'control.conf')
2041
self.assertIs(None, cfg.get_option('non-existant', 'SECTION'))
2044
[u'Permission denied while trying to open configuration file '
2045
u'nonexisting:///control.conf.'])
2047
def test_get_value(self):
2048
"""Test that retreiving a value from a section is possible"""
2049
bzrdir_config = config.TransportConfig(self.get_transport('.'),
2051
bzrdir_config.set_option('value', 'key', 'SECTION')
2052
bzrdir_config.set_option('value2', 'key2')
2053
bzrdir_config.set_option('value3-top', 'key3')
2054
bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
2055
value = bzrdir_config.get_option('key', 'SECTION')
2056
self.assertEqual(value, 'value')
2057
value = bzrdir_config.get_option('key2')
2058
self.assertEqual(value, 'value2')
2059
self.assertEqual(bzrdir_config.get_option('non-existant'), None)
2060
value = bzrdir_config.get_option('non-existant', 'SECTION')
2061
self.assertEqual(value, None)
2062
value = bzrdir_config.get_option('non-existant', default='default')
2063
self.assertEqual(value, 'default')
2064
self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
2065
value = bzrdir_config.get_option('key2', 'NOSECTION',
2067
self.assertEqual(value, 'default')
2068
value = bzrdir_config.get_option('key3')
2069
self.assertEqual(value, 'value3-top')
2070
value = bzrdir_config.get_option('key3', 'SECTION')
2071
self.assertEqual(value, 'value3-section')
2073
def test_set_unset_default_stack_on(self):
2074
my_dir = self.make_bzrdir('.')
2075
bzrdir_config = config.BzrDirConfig(my_dir)
2076
self.assertIs(None, bzrdir_config.get_default_stack_on())
2077
bzrdir_config.set_default_stack_on('Foo')
2078
self.assertEqual('Foo', bzrdir_config._config.get_option(
2079
'default_stack_on'))
2080
self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
2081
bzrdir_config.set_default_stack_on(None)
2082
self.assertIs(None, bzrdir_config.get_default_stack_on())
2085
class TestOldConfigHooks(tests.TestCaseWithTransport):
2088
super(TestOldConfigHooks, self).setUp()
2089
create_configs_with_file_option(self)
2091
def assertGetHook(self, conf, name, value):
2095
config.OldConfigHooks.install_named_hook('get', hook, None)
2097
config.OldConfigHooks.uninstall_named_hook, 'get', None)
2098
self.assertLength(0, calls)
2099
actual_value = conf.get_user_option(name)
2100
self.assertEquals(value, actual_value)
2101
self.assertLength(1, calls)
2102
self.assertEquals((conf, name, value), calls[0])
2104
def test_get_hook_bazaar(self):
2105
self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
2107
def test_get_hook_locations(self):
2108
self.assertGetHook(self.locations_config, 'file', 'locations')
2110
def test_get_hook_branch(self):
2111
# Since locations masks branch, we define a different option
2112
self.branch_config.set_user_option('file2', 'branch')
2113
self.assertGetHook(self.branch_config, 'file2', 'branch')
2115
def assertSetHook(self, conf, name, value):
2119
config.OldConfigHooks.install_named_hook('set', hook, None)
2121
config.OldConfigHooks.uninstall_named_hook, 'set', None)
2122
self.assertLength(0, calls)
2123
conf.set_user_option(name, value)
2124
self.assertLength(1, calls)
2125
# We can't assert the conf object below as different configs use
2126
# different means to implement set_user_option and we care only about
2128
self.assertEquals((name, value), calls[0][1:])
2130
def test_set_hook_bazaar(self):
2131
self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
2133
def test_set_hook_locations(self):
2134
self.assertSetHook(self.locations_config, 'foo', 'locations')
2136
def test_set_hook_branch(self):
2137
self.assertSetHook(self.branch_config, 'foo', 'branch')
2139
def assertRemoveHook(self, conf, name, section_name=None):
2143
config.OldConfigHooks.install_named_hook('remove', hook, None)
2145
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
2146
self.assertLength(0, calls)
2147
conf.remove_user_option(name, section_name)
2148
self.assertLength(1, calls)
2149
# We can't assert the conf object below as different configs use
2150
# different means to implement remove_user_option and we care only about
2152
self.assertEquals((name,), calls[0][1:])
2154
def test_remove_hook_bazaar(self):
2155
self.assertRemoveHook(self.bazaar_config, 'file')
2157
def test_remove_hook_locations(self):
2158
self.assertRemoveHook(self.locations_config, 'file',
2159
self.locations_config.location)
2161
def test_remove_hook_branch(self):
2162
self.assertRemoveHook(self.branch_config, 'file')
2164
def assertLoadHook(self, name, conf_class, *conf_args):
2168
config.OldConfigHooks.install_named_hook('load', hook, None)
2170
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2171
self.assertLength(0, calls)
2173
conf = conf_class(*conf_args)
2174
# Access an option to trigger a load
2175
conf.get_user_option(name)
2176
self.assertLength(1, calls)
2177
# Since we can't assert about conf, we just use the number of calls ;-/
2179
def test_load_hook_bazaar(self):
2180
self.assertLoadHook('file', config.GlobalConfig)
2182
def test_load_hook_locations(self):
2183
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
2185
def test_load_hook_branch(self):
2186
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
2188
def assertSaveHook(self, conf):
2192
config.OldConfigHooks.install_named_hook('save', hook, None)
2194
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2195
self.assertLength(0, calls)
2196
# Setting an option triggers a save
2197
conf.set_user_option('foo', 'bar')
2198
self.assertLength(1, calls)
2199
# Since we can't assert about conf, we just use the number of calls ;-/
2201
def test_save_hook_bazaar(self):
2202
self.assertSaveHook(self.bazaar_config)
2204
def test_save_hook_locations(self):
2205
self.assertSaveHook(self.locations_config)
2207
def test_save_hook_branch(self):
2208
self.assertSaveHook(self.branch_config)
2211
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
2212
"""Tests config hooks for remote configs.
2214
No tests for the remove hook as this is not implemented there.
2218
super(TestOldConfigHooksForRemote, self).setUp()
2219
self.transport_server = test_server.SmartTCPServer_for_testing
2220
create_configs_with_file_option(self)
2222
def assertGetHook(self, conf, name, value):
2226
config.OldConfigHooks.install_named_hook('get', hook, None)
2228
config.OldConfigHooks.uninstall_named_hook, 'get', None)
2229
self.assertLength(0, calls)
2230
actual_value = conf.get_option(name)
2231
self.assertEquals(value, actual_value)
2232
self.assertLength(1, calls)
2233
self.assertEquals((conf, name, value), calls[0])
2235
def test_get_hook_remote_branch(self):
2236
remote_branch = branch.Branch.open(self.get_url('tree'))
2237
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
2239
def test_get_hook_remote_bzrdir(self):
2240
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2241
conf = remote_bzrdir._get_config()
2242
conf.set_option('remotedir', 'file')
2243
self.assertGetHook(conf, 'file', 'remotedir')
2245
def assertSetHook(self, conf, name, value):
2249
config.OldConfigHooks.install_named_hook('set', hook, None)
2251
config.OldConfigHooks.uninstall_named_hook, 'set', None)
2252
self.assertLength(0, calls)
2253
conf.set_option(value, name)
2254
self.assertLength(1, calls)
2255
# We can't assert the conf object below as different configs use
2256
# different means to implement set_user_option and we care only about
2258
self.assertEquals((name, value), calls[0][1:])
2260
def test_set_hook_remote_branch(self):
2261
remote_branch = branch.Branch.open(self.get_url('tree'))
2262
self.addCleanup(remote_branch.lock_write().unlock)
2263
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
2265
def test_set_hook_remote_bzrdir(self):
2266
remote_branch = branch.Branch.open(self.get_url('tree'))
2267
self.addCleanup(remote_branch.lock_write().unlock)
2268
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2269
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
2271
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
2275
config.OldConfigHooks.install_named_hook('load', hook, None)
2277
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2278
self.assertLength(0, calls)
2280
conf = conf_class(*conf_args)
2281
# Access an option to trigger a load
2282
conf.get_option(name)
2283
self.assertLength(expected_nb_calls, calls)
2284
# Since we can't assert about conf, we just use the number of calls ;-/
2286
def test_load_hook_remote_branch(self):
2287
remote_branch = branch.Branch.open(self.get_url('tree'))
2288
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
2290
def test_load_hook_remote_bzrdir(self):
2291
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2292
# The config file doesn't exist, set an option to force its creation
2293
conf = remote_bzrdir._get_config()
2294
conf.set_option('remotedir', 'file')
2295
# We get one call for the server and one call for the client, this is
2296
# caused by the differences in implementations betwen
2297
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
2298
# SmartServerBranchGetConfigFile (in smart/branch.py)
2299
self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
2301
def assertSaveHook(self, conf):
2305
config.OldConfigHooks.install_named_hook('save', hook, None)
2307
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2308
self.assertLength(0, calls)
2309
# Setting an option triggers a save
2310
conf.set_option('foo', 'bar')
2311
self.assertLength(1, calls)
2312
# Since we can't assert about conf, we just use the number of calls ;-/
2314
def test_save_hook_remote_branch(self):
2315
remote_branch = branch.Branch.open(self.get_url('tree'))
2316
self.addCleanup(remote_branch.lock_write().unlock)
2317
self.assertSaveHook(remote_branch._get_config())
2319
def test_save_hook_remote_bzrdir(self):
2320
remote_branch = branch.Branch.open(self.get_url('tree'))
2321
self.addCleanup(remote_branch.lock_write().unlock)
2322
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2323
self.assertSaveHook(remote_bzrdir._get_config())
2326
class TestOption(tests.TestCase):
2328
def test_default_value(self):
2329
opt = config.Option('foo', default='bar')
2330
self.assertEquals('bar', opt.get_default())
2332
def test_callable_default_value(self):
2333
def bar_as_unicode():
2335
opt = config.Option('foo', default=bar_as_unicode)
2336
self.assertEquals('bar', opt.get_default())
2338
def test_default_value_from_env(self):
2339
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
2340
self.overrideEnv('FOO', 'quux')
2341
# Env variable provides a default taking over the option one
2342
self.assertEquals('quux', opt.get_default())
2344
def test_first_default_value_from_env_wins(self):
2345
opt = config.Option('foo', default='bar',
2346
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
2347
self.overrideEnv('FOO', 'foo')
2348
self.overrideEnv('BAZ', 'baz')
2349
# The first env var set wins
2350
self.assertEquals('foo', opt.get_default())
2352
def test_not_supported_list_default_value(self):
2353
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
2355
def test_not_supported_object_default_value(self):
2356
self.assertRaises(AssertionError, config.Option, 'foo',
2359
def test_not_supported_callable_default_value_not_unicode(self):
2360
def bar_not_unicode():
2362
opt = config.Option('foo', default=bar_not_unicode)
2363
self.assertRaises(AssertionError, opt.get_default)
2366
class TestOptionConverterMixin(object):
2368
def assertConverted(self, expected, opt, value):
2369
self.assertEquals(expected, opt.convert_from_unicode(None, value))
2371
def assertWarns(self, opt, value):
2374
warnings.append(args[0] % args[1:])
2375
self.overrideAttr(trace, 'warning', warning)
2376
self.assertEquals(None, opt.convert_from_unicode(None, value))
2377
self.assertLength(1, warnings)
2379
'Value "%s" is not valid for "%s"' % (value, opt.name),
2382
def assertErrors(self, opt, value):
2383
self.assertRaises(errors.ConfigOptionValueError,
2384
opt.convert_from_unicode, None, value)
2386
def assertConvertInvalid(self, opt, invalid_value):
2388
self.assertEquals(None, opt.convert_from_unicode(None, invalid_value))
2389
opt.invalid = 'warning'
2390
self.assertWarns(opt, invalid_value)
2391
opt.invalid = 'error'
2392
self.assertErrors(opt, invalid_value)
2395
class TestOptionWithBooleanConverter(tests.TestCase, TestOptionConverterMixin):
2397
def get_option(self):
2398
return config.Option('foo', help='A boolean.',
2399
from_unicode=config.bool_from_store)
2401
def test_convert_invalid(self):
2402
opt = self.get_option()
2403
# A string that is not recognized as a boolean
2404
self.assertConvertInvalid(opt, u'invalid-boolean')
2405
# A list of strings is never recognized as a boolean
2406
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2408
def test_convert_valid(self):
2409
opt = self.get_option()
2410
self.assertConverted(True, opt, u'True')
2411
self.assertConverted(True, opt, u'1')
2412
self.assertConverted(False, opt, u'False')
2415
class TestOptionWithIntegerConverter(tests.TestCase, TestOptionConverterMixin):
2417
def get_option(self):
2418
return config.Option('foo', help='An integer.',
2419
from_unicode=config.int_from_store)
2421
def test_convert_invalid(self):
2422
opt = self.get_option()
2423
# A string that is not recognized as an integer
2424
self.assertConvertInvalid(opt, u'forty-two')
2425
# A list of strings is never recognized as an integer
2426
self.assertConvertInvalid(opt, [u'a', u'list'])
2428
def test_convert_valid(self):
2429
opt = self.get_option()
2430
self.assertConverted(16, opt, u'16')
2433
class TestOptionWithSIUnitConverter(tests.TestCase, TestOptionConverterMixin):
2435
def get_option(self):
2436
return config.Option('foo', help='An integer in SI units.',
2437
from_unicode=config.int_SI_from_store)
2439
def test_convert_invalid(self):
2440
opt = self.get_option()
2441
self.assertConvertInvalid(opt, u'not-a-unit')
2442
self.assertConvertInvalid(opt, u'Gb') # Forgot the int
2443
self.assertConvertInvalid(opt, u'1b') # Forgot the unit
2444
self.assertConvertInvalid(opt, u'1GG')
2445
self.assertConvertInvalid(opt, u'1Mbb')
2446
self.assertConvertInvalid(opt, u'1MM')
2448
def test_convert_valid(self):
2449
opt = self.get_option()
2450
self.assertConverted(int(5e3), opt, u'5kb')
2451
self.assertConverted(int(5e6), opt, u'5M')
2452
self.assertConverted(int(5e6), opt, u'5MB')
2453
self.assertConverted(int(5e9), opt, u'5g')
2454
self.assertConverted(int(5e9), opt, u'5gB')
2455
self.assertConverted(100, opt, u'100')
2458
class TestListOption(tests.TestCase, TestOptionConverterMixin):
2460
def get_option(self):
2461
return config.ListOption('foo', help='A list.')
2463
def test_convert_invalid(self):
2464
opt = self.get_option()
2465
# We don't even try to convert a list into a list, we only expect
2467
self.assertConvertInvalid(opt, [1])
2468
# No string is invalid as all forms can be converted to a list
2470
def test_convert_valid(self):
2471
opt = self.get_option()
2472
# An empty string is an empty list
2473
self.assertConverted([], opt, '') # Using a bare str() just in case
2474
self.assertConverted([], opt, u'')
2476
self.assertConverted([u'True'], opt, u'True')
2478
self.assertConverted([u'42'], opt, u'42')
2480
self.assertConverted([u'bar'], opt, u'bar')
2483
class TestOptionRegistry(tests.TestCase):
2486
super(TestOptionRegistry, self).setUp()
2487
# Always start with an empty registry
2488
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2489
self.registry = config.option_registry
2491
def test_register(self):
2492
opt = config.Option('foo')
2493
self.registry.register(opt)
2494
self.assertIs(opt, self.registry.get('foo'))
2496
def test_registered_help(self):
2497
opt = config.Option('foo', help='A simple option')
2498
self.registry.register(opt)
2499
self.assertEquals('A simple option', self.registry.get_help('foo'))
2501
lazy_option = config.Option('lazy_foo', help='Lazy help')
2503
def test_register_lazy(self):
2504
self.registry.register_lazy('lazy_foo', self.__module__,
2505
'TestOptionRegistry.lazy_option')
2506
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2508
def test_registered_lazy_help(self):
2509
self.registry.register_lazy('lazy_foo', self.__module__,
2510
'TestOptionRegistry.lazy_option')
2511
self.assertEquals('Lazy help', self.registry.get_help('lazy_foo'))
2514
class TestRegisteredOptions(tests.TestCase):
2515
"""All registered options should verify some constraints."""
2517
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2518
in config.option_registry.iteritems()]
2521
super(TestRegisteredOptions, self).setUp()
2522
self.registry = config.option_registry
2524
def test_proper_name(self):
2525
# An option should be registered under its own name, this can't be
2526
# checked at registration time for the lazy ones.
2527
self.assertEquals(self.option_name, self.option.name)
2529
def test_help_is_set(self):
2530
option_help = self.registry.get_help(self.option_name)
2531
self.assertNotEquals(None, option_help)
2532
# Come on, think about the user, he really wants to know what the
2534
self.assertIsNot(None, option_help)
2535
self.assertNotEquals('', option_help)
2538
class TestSection(tests.TestCase):
2540
# FIXME: Parametrize so that all sections produced by Stores run these
2541
# tests -- vila 2011-04-01
2543
def test_get_a_value(self):
2544
a_dict = dict(foo='bar')
2545
section = config.Section('myID', a_dict)
2546
self.assertEquals('bar', section.get('foo'))
2548
def test_get_unknown_option(self):
2550
section = config.Section(None, a_dict)
2551
self.assertEquals('out of thin air',
2552
section.get('foo', 'out of thin air'))
2554
def test_options_is_shared(self):
2556
section = config.Section(None, a_dict)
2557
self.assertIs(a_dict, section.options)
2560
class TestMutableSection(tests.TestCase):
2562
scenarios = [('mutable',
2564
lambda opts: config.MutableSection('myID', opts)},),
2568
a_dict = dict(foo='bar')
2569
section = self.get_section(a_dict)
2570
section.set('foo', 'new_value')
2571
self.assertEquals('new_value', section.get('foo'))
2572
# The change appears in the shared section
2573
self.assertEquals('new_value', a_dict.get('foo'))
2574
# We keep track of the change
2575
self.assertTrue('foo' in section.orig)
2576
self.assertEquals('bar', section.orig.get('foo'))
2578
def test_set_preserve_original_once(self):
2579
a_dict = dict(foo='bar')
2580
section = self.get_section(a_dict)
2581
section.set('foo', 'first_value')
2582
section.set('foo', 'second_value')
2583
# We keep track of the original value
2584
self.assertTrue('foo' in section.orig)
2585
self.assertEquals('bar', section.orig.get('foo'))
2587
def test_remove(self):
2588
a_dict = dict(foo='bar')
2589
section = self.get_section(a_dict)
2590
section.remove('foo')
2591
# We get None for unknown options via the default value
2592
self.assertEquals(None, section.get('foo'))
2593
# Or we just get the default value
2594
self.assertEquals('unknown', section.get('foo', 'unknown'))
2595
self.assertFalse('foo' in section.options)
2596
# We keep track of the deletion
2597
self.assertTrue('foo' in section.orig)
2598
self.assertEquals('bar', section.orig.get('foo'))
2600
def test_remove_new_option(self):
2602
section = self.get_section(a_dict)
2603
section.set('foo', 'bar')
2604
section.remove('foo')
2605
self.assertFalse('foo' in section.options)
2606
# The option didn't exist initially so it we need to keep track of it
2607
# with a special value
2608
self.assertTrue('foo' in section.orig)
2609
self.assertEquals(config._NewlyCreatedOption, section.orig['foo'])
2612
class TestCommandLineStore(tests.TestCase):
2615
super(TestCommandLineStore, self).setUp()
2616
self.store = config.CommandLineStore()
2617
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2619
def get_section(self):
2620
"""Get the unique section for the command line overrides."""
2621
sections = list(self.store.get_sections())
2622
self.assertLength(1, sections)
2623
store, section = sections[0]
2624
self.assertEquals(self.store, store)
2627
def test_no_override(self):
2628
self.store._from_cmdline([])
2629
section = self.get_section()
2630
self.assertLength(0, list(section.iter_option_names()))
2632
def test_simple_override(self):
2633
self.store._from_cmdline(['a=b'])
2634
section = self.get_section()
2635
self.assertEqual('b', section.get('a'))
2637
def test_list_override(self):
2638
opt = config.ListOption('l')
2639
config.option_registry.register(opt)
2640
self.store._from_cmdline(['l=1,2,3'])
2641
val = self.get_section().get('l')
2642
self.assertEqual('1,2,3', val)
2643
# Reminder: lists should be registered as such explicitely, otherwise
2644
# the conversion needs to be done afterwards.
2645
self.assertEqual(['1', '2', '3'],
2646
opt.convert_from_unicode(self.store, val))
2648
def test_multiple_overrides(self):
2649
self.store._from_cmdline(['a=b', 'x=y'])
2650
section = self.get_section()
2651
self.assertEquals('b', section.get('a'))
2652
self.assertEquals('y', section.get('x'))
2654
def test_wrong_syntax(self):
2655
self.assertRaises(errors.BzrCommandError,
2656
self.store._from_cmdline, ['a=b', 'c'])
2658
class TestStoreMinimalAPI(tests.TestCaseWithTransport):
2660
scenarios = [(key, {'get_store': builder}) for key, builder
2661
in config.test_store_builder_registry.iteritems()] + [
2662
('cmdline', {'get_store': lambda test: config.CommandLineStore()})]
2665
store = self.get_store(self)
2666
if type(store) == config.TransportIniFileStore:
2667
raise tests.TestNotApplicable(
2668
"%s is not a concrete Store implementation"
2669
" so it doesn't need an id" % (store.__class__.__name__,))
2670
self.assertIsNot(None, store.id)
2673
class TestStore(tests.TestCaseWithTransport):
2675
def assertSectionContent(self, expected, (store, section)):
2676
"""Assert that some options have the proper values in a section."""
2677
expected_name, expected_options = expected
2678
self.assertEquals(expected_name, section.id)
2681
dict([(k, section.get(k)) for k in expected_options.keys()]))
2684
class TestReadonlyStore(TestStore):
2686
scenarios = [(key, {'get_store': builder}) for key, builder
2687
in config.test_store_builder_registry.iteritems()]
2689
def test_building_delays_load(self):
2690
store = self.get_store(self)
2691
self.assertEquals(False, store.is_loaded())
2692
store._load_from_string('')
2693
self.assertEquals(True, store.is_loaded())
2695
def test_get_no_sections_for_empty(self):
2696
store = self.get_store(self)
2697
store._load_from_string('')
2698
self.assertEquals([], list(store.get_sections()))
2700
def test_get_default_section(self):
2701
store = self.get_store(self)
2702
store._load_from_string('foo=bar')
2703
sections = list(store.get_sections())
2704
self.assertLength(1, sections)
2705
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2707
def test_get_named_section(self):
2708
store = self.get_store(self)
2709
store._load_from_string('[baz]\nfoo=bar')
2710
sections = list(store.get_sections())
2711
self.assertLength(1, sections)
2712
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2714
def test_load_from_string_fails_for_non_empty_store(self):
2715
store = self.get_store(self)
2716
store._load_from_string('foo=bar')
2717
self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2720
class TestStoreQuoting(TestStore):
2722
scenarios = [(key, {'get_store': builder}) for key, builder
2723
in config.test_store_builder_registry.iteritems()]
2726
super(TestStoreQuoting, self).setUp()
2727
self.store = self.get_store(self)
2728
# We need a loaded store but any content will do
2729
self.store._load_from_string('')
2731
def assertIdempotent(self, s):
2732
"""Assert that quoting an unquoted string is a no-op and vice-versa.
2734
What matters here is that option values, as they appear in a store, can
2735
be safely round-tripped out of the store and back.
2737
:param s: A string, quoted if required.
2739
self.assertEquals(s, self.store.quote(self.store.unquote(s)))
2740
self.assertEquals(s, self.store.unquote(self.store.quote(s)))
2742
def test_empty_string(self):
2743
if isinstance(self.store, config.IniFileStore):
2744
# configobj._quote doesn't handle empty values
2745
self.assertRaises(AssertionError,
2746
self.assertIdempotent, '')
2748
self.assertIdempotent('')
2749
# But quoted empty strings are ok
2750
self.assertIdempotent('""')
2752
def test_embedded_spaces(self):
2753
self.assertIdempotent('" a b c "')
2755
def test_embedded_commas(self):
2756
self.assertIdempotent('" a , b c "')
2758
def test_simple_comma(self):
2759
if isinstance(self.store, config.IniFileStore):
2760
# configobj requires that lists are special-cased
2761
self.assertRaises(AssertionError,
2762
self.assertIdempotent, ',')
2764
self.assertIdempotent(',')
2765
# When a single comma is required, quoting is also required
2766
self.assertIdempotent('","')
2768
def test_list(self):
2769
if isinstance(self.store, config.IniFileStore):
2770
# configobj requires that lists are special-cased
2771
self.assertRaises(AssertionError,
2772
self.assertIdempotent, 'a,b')
2774
self.assertIdempotent('a,b')
2777
class TestDictFromStore(tests.TestCase):
2779
def test_unquote_not_string(self):
2780
conf = config.MemoryStack('x=2\n[a_section]\na=1\n')
2781
value = conf.get('a_section')
2782
# Urgh, despite 'conf' asking for the no-name section, we get the
2783
# content of another section as a dict o_O
2784
self.assertEquals({'a': '1'}, value)
2785
unquoted = conf.store.unquote(value)
2786
# Which cannot be unquoted but shouldn't crash either (the use cases
2787
# are getting the value or displaying it. In the later case, '%s' will
2789
self.assertEquals({'a': '1'}, unquoted)
2790
self.assertEquals("{u'a': u'1'}", '%s' % (unquoted,))
2793
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2794
"""Simulate loading a config store with content of various encodings.
2796
All files produced by bzr are in utf8 content.
2798
Users may modify them manually and end up with a file that can't be
2799
loaded. We need to issue proper error messages in this case.
2802
invalid_utf8_char = '\xff'
2804
def test_load_utf8(self):
2805
"""Ensure we can load an utf8-encoded file."""
2806
t = self.get_transport()
2807
# From http://pad.lv/799212
2808
unicode_user = u'b\N{Euro Sign}ar'
2809
unicode_content = u'user=%s' % (unicode_user,)
2810
utf8_content = unicode_content.encode('utf8')
2811
# Store the raw content in the config file
2812
t.put_bytes('foo.conf', utf8_content)
2813
store = config.TransportIniFileStore(t, 'foo.conf')
2815
stack = config.Stack([store.get_sections], store)
2816
self.assertEquals(unicode_user, stack.get('user'))
2818
def test_load_non_ascii(self):
2819
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2820
t = self.get_transport()
2821
t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2822
store = config.TransportIniFileStore(t, 'foo.conf')
2823
self.assertRaises(errors.ConfigContentError, store.load)
2825
def test_load_erroneous_content(self):
2826
"""Ensure we display a proper error on content that can't be parsed."""
2827
t = self.get_transport()
2828
t.put_bytes('foo.conf', '[open_section\n')
2829
store = config.TransportIniFileStore(t, 'foo.conf')
2830
self.assertRaises(errors.ParseConfigError, store.load)
2832
def test_load_permission_denied(self):
2833
"""Ensure we get warned when trying to load an inaccessible file."""
2836
warnings.append(args[0] % args[1:])
2837
self.overrideAttr(trace, 'warning', warning)
2839
t = self.get_transport()
2841
def get_bytes(relpath):
2842
raise errors.PermissionDenied(relpath, "")
2843
t.get_bytes = get_bytes
2844
store = config.TransportIniFileStore(t, 'foo.conf')
2845
self.assertRaises(errors.PermissionDenied, store.load)
2848
[u'Permission denied while trying to load configuration store %s.'
2849
% store.external_url()])
2852
class TestIniConfigContent(tests.TestCaseWithTransport):
2853
"""Simulate loading a IniBasedConfig with content of various encodings.
2855
All files produced by bzr are in utf8 content.
2857
Users may modify them manually and end up with a file that can't be
2858
loaded. We need to issue proper error messages in this case.
2861
invalid_utf8_char = '\xff'
2863
def test_load_utf8(self):
2864
"""Ensure we can load an utf8-encoded file."""
2865
# From http://pad.lv/799212
2866
unicode_user = u'b\N{Euro Sign}ar'
2867
unicode_content = u'user=%s' % (unicode_user,)
2868
utf8_content = unicode_content.encode('utf8')
2869
# Store the raw content in the config file
2870
with open('foo.conf', 'wb') as f:
2871
f.write(utf8_content)
2872
conf = config.IniBasedConfig(file_name='foo.conf')
2873
self.assertEquals(unicode_user, conf.get_user_option('user'))
2875
def test_load_badly_encoded_content(self):
2876
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2877
with open('foo.conf', 'wb') as f:
2878
f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2879
conf = config.IniBasedConfig(file_name='foo.conf')
2880
self.assertRaises(errors.ConfigContentError, conf._get_parser)
2882
def test_load_erroneous_content(self):
2883
"""Ensure we display a proper error on content that can't be parsed."""
2884
with open('foo.conf', 'wb') as f:
2885
f.write('[open_section\n')
2886
conf = config.IniBasedConfig(file_name='foo.conf')
2887
self.assertRaises(errors.ParseConfigError, conf._get_parser)
2890
class TestMutableStore(TestStore):
2892
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2893
in config.test_store_builder_registry.iteritems()]
2896
super(TestMutableStore, self).setUp()
2897
self.transport = self.get_transport()
2899
def has_store(self, store):
2900
store_basename = urlutils.relative_url(self.transport.external_url(),
2901
store.external_url())
2902
return self.transport.has(store_basename)
2904
def test_save_empty_creates_no_file(self):
2905
# FIXME: There should be a better way than relying on the test
2906
# parametrization to identify branch.conf -- vila 2011-0526
2907
if self.store_id in ('branch', 'remote_branch'):
2908
raise tests.TestNotApplicable(
2909
'branch.conf is *always* created when a branch is initialized')
2910
store = self.get_store(self)
2912
self.assertEquals(False, self.has_store(store))
2914
def test_save_emptied_succeeds(self):
2915
store = self.get_store(self)
2916
store._load_from_string('foo=bar\n')
2917
section = store.get_mutable_section(None)
2918
section.remove('foo')
2920
self.assertEquals(True, self.has_store(store))
2921
modified_store = self.get_store(self)
2922
sections = list(modified_store.get_sections())
2923
self.assertLength(0, sections)
2925
def test_save_with_content_succeeds(self):
2926
# FIXME: There should be a better way than relying on the test
2927
# parametrization to identify branch.conf -- vila 2011-0526
2928
if self.store_id in ('branch', 'remote_branch'):
2929
raise tests.TestNotApplicable(
2930
'branch.conf is *always* created when a branch is initialized')
2931
store = self.get_store(self)
2932
store._load_from_string('foo=bar\n')
2933
self.assertEquals(False, self.has_store(store))
2935
self.assertEquals(True, self.has_store(store))
2936
modified_store = self.get_store(self)
2937
sections = list(modified_store.get_sections())
2938
self.assertLength(1, sections)
2939
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2941
def test_set_option_in_empty_store(self):
2942
store = self.get_store(self)
2943
section = store.get_mutable_section(None)
2944
section.set('foo', 'bar')
2946
modified_store = self.get_store(self)
2947
sections = list(modified_store.get_sections())
2948
self.assertLength(1, sections)
2949
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2951
def test_set_option_in_default_section(self):
2952
store = self.get_store(self)
2953
store._load_from_string('')
2954
section = store.get_mutable_section(None)
2955
section.set('foo', 'bar')
2957
modified_store = self.get_store(self)
2958
sections = list(modified_store.get_sections())
2959
self.assertLength(1, sections)
2960
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2962
def test_set_option_in_named_section(self):
2963
store = self.get_store(self)
2964
store._load_from_string('')
2965
section = store.get_mutable_section('baz')
2966
section.set('foo', 'bar')
2968
modified_store = self.get_store(self)
2969
sections = list(modified_store.get_sections())
2970
self.assertLength(1, sections)
2971
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2973
def test_load_hook(self):
2974
# We first needs to ensure that the store exists
2975
store = self.get_store(self)
2976
section = store.get_mutable_section('baz')
2977
section.set('foo', 'bar')
2979
# Now we can try to load it
2980
store = self.get_store(self)
2984
config.ConfigHooks.install_named_hook('load', hook, None)
2985
self.assertLength(0, calls)
2987
self.assertLength(1, calls)
2988
self.assertEquals((store,), calls[0])
2990
def test_save_hook(self):
2994
config.ConfigHooks.install_named_hook('save', hook, None)
2995
self.assertLength(0, calls)
2996
store = self.get_store(self)
2997
section = store.get_mutable_section('baz')
2998
section.set('foo', 'bar')
3000
self.assertLength(1, calls)
3001
self.assertEquals((store,), calls[0])
3003
def test_set_mark_dirty(self):
3004
stack = config.MemoryStack('')
3005
self.assertLength(0, stack.store.dirty_sections)
3006
stack.set('foo', 'baz')
3007
self.assertLength(1, stack.store.dirty_sections)
3008
self.assertTrue(stack.store._need_saving())
3010
def test_remove_mark_dirty(self):
3011
stack = config.MemoryStack('foo=bar')
3012
self.assertLength(0, stack.store.dirty_sections)
3014
self.assertLength(1, stack.store.dirty_sections)
3015
self.assertTrue(stack.store._need_saving())
3018
class TestStoreSaveChanges(tests.TestCaseWithTransport):
3019
"""Tests that config changes are kept in memory and saved on-demand."""
3022
super(TestStoreSaveChanges, self).setUp()
3023
self.transport = self.get_transport()
3024
# Most of the tests involve two stores pointing to the same persistent
3025
# storage to observe the effects of concurrent changes
3026
self.st1 = config.TransportIniFileStore(self.transport, 'foo.conf')
3027
self.st2 = config.TransportIniFileStore(self.transport, 'foo.conf')
3030
self.warnings.append(args[0] % args[1:])
3031
self.overrideAttr(trace, 'warning', warning)
3033
def has_store(self, store):
3034
store_basename = urlutils.relative_url(self.transport.external_url(),
3035
store.external_url())
3036
return self.transport.has(store_basename)
3038
def get_stack(self, store):
3039
# Any stack will do as long as it uses the right store, just a single
3040
# no-name section is enough
3041
return config.Stack([store.get_sections], store)
3043
def test_no_changes_no_save(self):
3044
s = self.get_stack(self.st1)
3045
s.store.save_changes()
3046
self.assertEquals(False, self.has_store(self.st1))
3048
def test_unrelated_concurrent_update(self):
3049
s1 = self.get_stack(self.st1)
3050
s2 = self.get_stack(self.st2)
3051
s1.set('foo', 'bar')
3052
s2.set('baz', 'quux')
3054
# Changes don't propagate magically
3055
self.assertEquals(None, s1.get('baz'))
3056
s2.store.save_changes()
3057
self.assertEquals('quux', s2.get('baz'))
3058
# Changes are acquired when saving
3059
self.assertEquals('bar', s2.get('foo'))
3060
# Since there is no overlap, no warnings are emitted
3061
self.assertLength(0, self.warnings)
3063
def test_concurrent_update_modified(self):
3064
s1 = self.get_stack(self.st1)
3065
s2 = self.get_stack(self.st2)
3066
s1.set('foo', 'bar')
3067
s2.set('foo', 'baz')
3070
s2.store.save_changes()
3071
self.assertEquals('baz', s2.get('foo'))
3072
# But the user get a warning
3073
self.assertLength(1, self.warnings)
3074
warning = self.warnings[0]
3075
self.assertStartsWith(warning, 'Option foo in section None')
3076
self.assertEndsWith(warning, 'was changed from <CREATED> to bar.'
3077
' The baz value will be saved.')
3079
def test_concurrent_deletion(self):
3080
self.st1._load_from_string('foo=bar')
3082
s1 = self.get_stack(self.st1)
3083
s2 = self.get_stack(self.st2)
3086
s1.store.save_changes()
3088
self.assertLength(0, self.warnings)
3089
s2.store.save_changes()
3091
self.assertLength(1, self.warnings)
3092
warning = self.warnings[0]
3093
self.assertStartsWith(warning, 'Option foo in section None')
3094
self.assertEndsWith(warning, 'was changed from bar to <CREATED>.'
3095
' The <DELETED> value will be saved.')
3098
class TestQuotingIniFileStore(tests.TestCaseWithTransport):
3100
def get_store(self):
3101
return config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3103
def test_get_quoted_string(self):
3104
store = self.get_store()
3105
store._load_from_string('foo= " abc "')
3106
stack = config.Stack([store.get_sections])
3107
self.assertEquals(' abc ', stack.get('foo'))
3109
def test_set_quoted_string(self):
3110
store = self.get_store()
3111
stack = config.Stack([store.get_sections], store)
3112
stack.set('foo', ' a b c ')
3114
self.assertFileEqual('foo = " a b c "' + os.linesep, 'foo.conf')
3117
class TestTransportIniFileStore(TestStore):
3119
def test_loading_unknown_file_fails(self):
3120
store = config.TransportIniFileStore(self.get_transport(),
3122
self.assertRaises(errors.NoSuchFile, store.load)
3124
def test_invalid_content(self):
3125
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3126
self.assertEquals(False, store.is_loaded())
3127
exc = self.assertRaises(
3128
errors.ParseConfigError, store._load_from_string,
3129
'this is invalid !')
3130
self.assertEndsWith(exc.filename, 'foo.conf')
3131
# And the load failed
3132
self.assertEquals(False, store.is_loaded())
3134
def test_get_embedded_sections(self):
3135
# A more complicated example (which also shows that section names and
3136
# option names share the same name space...)
3137
# FIXME: This should be fixed by forbidding dicts as values ?
3138
# -- vila 2011-04-05
3139
store = config.TransportIniFileStore(self.get_transport(), 'foo.conf')
3140
store._load_from_string('''
3144
foo_in_DEFAULT=foo_DEFAULT
3152
sections = list(store.get_sections())
3153
self.assertLength(4, sections)
3154
# The default section has no name.
3155
# List values are provided as strings and need to be explicitly
3156
# converted by specifying from_unicode=list_from_store at option
3158
self.assertSectionContent((None, {'foo': 'bar', 'l': u'1,2'}),
3160
self.assertSectionContent(
3161
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
3162
self.assertSectionContent(
3163
('bar', {'foo_in_bar': 'barbar'}), sections[2])
3164
# sub sections are provided as embedded dicts.
3165
self.assertSectionContent(
3166
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
3170
class TestLockableIniFileStore(TestStore):
3172
def test_create_store_in_created_dir(self):
3173
self.assertPathDoesNotExist('dir')
3174
t = self.get_transport('dir/subdir')
3175
store = config.LockableIniFileStore(t, 'foo.conf')
3176
store.get_mutable_section(None).set('foo', 'bar')
3178
self.assertPathExists('dir/subdir')
3181
class TestConcurrentStoreUpdates(TestStore):
3182
"""Test that Stores properly handle conccurent updates.
3184
New Store implementation may fail some of these tests but until such
3185
implementations exist it's hard to properly filter them from the scenarios
3186
applied here. If you encounter such a case, contact the bzr devs.
3189
scenarios = [(key, {'get_stack': builder}) for key, builder
3190
in config.test_stack_builder_registry.iteritems()]
3193
super(TestConcurrentStoreUpdates, self).setUp()
3194
self.stack = self.get_stack(self)
3195
if not isinstance(self.stack, config._CompatibleStack):
3196
raise tests.TestNotApplicable(
3197
'%s is not meant to be compatible with the old config design'
3199
self.stack.set('one', '1')
3200
self.stack.set('two', '2')
3202
self.stack.store.save()
3204
def test_simple_read_access(self):
3205
self.assertEquals('1', self.stack.get('one'))
3207
def test_simple_write_access(self):
3208
self.stack.set('one', 'one')
3209
self.assertEquals('one', self.stack.get('one'))
3211
def test_listen_to_the_last_speaker(self):
3213
c2 = self.get_stack(self)
3214
c1.set('one', 'ONE')
3215
c2.set('two', 'TWO')
3216
self.assertEquals('ONE', c1.get('one'))
3217
self.assertEquals('TWO', c2.get('two'))
3218
# The second update respect the first one
3219
self.assertEquals('ONE', c2.get('one'))
3221
def test_last_speaker_wins(self):
3222
# If the same config is not shared, the same variable modified twice
3223
# can only see a single result.
3225
c2 = self.get_stack(self)
3228
self.assertEquals('c2', c2.get('one'))
3229
# The first modification is still available until another refresh
3231
self.assertEquals('c1', c1.get('one'))
3232
c1.set('two', 'done')
3233
self.assertEquals('c2', c1.get('one'))
3235
def test_writes_are_serialized(self):
3237
c2 = self.get_stack(self)
3239
# We spawn a thread that will pause *during* the config saving.
3240
before_writing = threading.Event()
3241
after_writing = threading.Event()
3242
writing_done = threading.Event()
3243
c1_save_without_locking_orig = c1.store.save_without_locking
3244
def c1_save_without_locking():
3245
before_writing.set()
3246
c1_save_without_locking_orig()
3247
# The lock is held. We wait for the main thread to decide when to
3249
after_writing.wait()
3250
c1.store.save_without_locking = c1_save_without_locking
3254
t1 = threading.Thread(target=c1_set)
3255
# Collect the thread after the test
3256
self.addCleanup(t1.join)
3257
# Be ready to unblock the thread if the test goes wrong
3258
self.addCleanup(after_writing.set)
3260
before_writing.wait()
3261
self.assertRaises(errors.LockContention,
3262
c2.set, 'one', 'c2')
3263
self.assertEquals('c1', c1.get('one'))
3264
# Let the lock be released
3268
self.assertEquals('c2', c2.get('one'))
3270
def test_read_while_writing(self):
3272
# We spawn a thread that will pause *during* the write
3273
ready_to_write = threading.Event()
3274
do_writing = threading.Event()
3275
writing_done = threading.Event()
3276
# We override the _save implementation so we know the store is locked
3277
c1_save_without_locking_orig = c1.store.save_without_locking
3278
def c1_save_without_locking():
3279
ready_to_write.set()
3280
# The lock is held. We wait for the main thread to decide when to
3283
c1_save_without_locking_orig()
3285
c1.store.save_without_locking = c1_save_without_locking
3288
t1 = threading.Thread(target=c1_set)
3289
# Collect the thread after the test
3290
self.addCleanup(t1.join)
3291
# Be ready to unblock the thread if the test goes wrong
3292
self.addCleanup(do_writing.set)
3294
# Ensure the thread is ready to write
3295
ready_to_write.wait()
3296
self.assertEquals('c1', c1.get('one'))
3297
# If we read during the write, we get the old value
3298
c2 = self.get_stack(self)
3299
self.assertEquals('1', c2.get('one'))
3300
# Let the writing occur and ensure it occurred
3303
# Now we get the updated value
3304
c3 = self.get_stack(self)
3305
self.assertEquals('c1', c3.get('one'))
3307
# FIXME: It may be worth looking into removing the lock dir when it's not
3308
# needed anymore and look at possible fallouts for concurrent lockers. This
3309
# will matter if/when we use config files outside of bazaar directories
3310
# (.bazaar or .bzr) -- vila 20110-04-111
3313
class TestSectionMatcher(TestStore):
3315
scenarios = [('location', {'matcher': config.LocationMatcher}),
3316
('id', {'matcher': config.NameMatcher}),]
3319
super(TestSectionMatcher, self).setUp()
3320
# Any simple store is good enough
3321
self.get_store = config.test_store_builder_registry.get('configobj')
3323
def test_no_matches_for_empty_stores(self):
3324
store = self.get_store(self)
3325
store._load_from_string('')
3326
matcher = self.matcher(store, '/bar')
3327
self.assertEquals([], list(matcher.get_sections()))
3329
def test_build_doesnt_load_store(self):
3330
store = self.get_store(self)
3331
matcher = self.matcher(store, '/bar')
3332
self.assertFalse(store.is_loaded())
3335
class TestLocationSection(tests.TestCase):
3337
def get_section(self, options, extra_path):
3338
section = config.Section('foo', options)
3339
return config.LocationSection(section, extra_path)
3341
def test_simple_option(self):
3342
section = self.get_section({'foo': 'bar'}, '')
3343
self.assertEquals('bar', section.get('foo'))
3345
def test_option_with_extra_path(self):
3346
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
3348
self.assertEquals('bar/baz', section.get('foo'))
3350
def test_invalid_policy(self):
3351
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
3353
# invalid policies are ignored
3354
self.assertEquals('bar', section.get('foo'))
3357
class TestLocationMatcher(TestStore):
3360
super(TestLocationMatcher, self).setUp()
3361
# Any simple store is good enough
3362
self.get_store = config.test_store_builder_registry.get('configobj')
3364
def test_unrelated_section_excluded(self):
3365
store = self.get_store(self)
3366
store._load_from_string('''
3374
section=/foo/bar/baz
3378
self.assertEquals(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
3380
[section.id for _, section in store.get_sections()])
3381
matcher = config.LocationMatcher(store, '/foo/bar/quux')
3382
sections = [section for _, section in matcher.get_sections()]
3383
self.assertEquals(['/foo/bar', '/foo'],
3384
[section.id for section in sections])
3385
self.assertEquals(['quux', 'bar/quux'],
3386
[section.extra_path for section in sections])
3388
def test_more_specific_sections_first(self):
3389
store = self.get_store(self)
3390
store._load_from_string('''
3396
self.assertEquals(['/foo', '/foo/bar'],
3397
[section.id for _, section in store.get_sections()])
3398
matcher = config.LocationMatcher(store, '/foo/bar/baz')
3399
sections = [section for _, section in matcher.get_sections()]
3400
self.assertEquals(['/foo/bar', '/foo'],
3401
[section.id for section in sections])
3402
self.assertEquals(['baz', 'bar/baz'],
3403
[section.extra_path for section in sections])
3405
def test_appendpath_in_no_name_section(self):
3406
# It's a bit weird to allow appendpath in a no-name section, but
3407
# someone may found a use for it
3408
store = self.get_store(self)
3409
store._load_from_string('''
3411
foo:policy = appendpath
3413
matcher = config.LocationMatcher(store, 'dir/subdir')
3414
sections = list(matcher.get_sections())
3415
self.assertLength(1, sections)
3416
self.assertEquals('bar/dir/subdir', sections[0][1].get('foo'))
3418
def test_file_urls_are_normalized(self):
3419
store = self.get_store(self)
3420
if sys.platform == 'win32':
3421
expected_url = 'file:///C:/dir/subdir'
3422
expected_location = 'C:/dir/subdir'
3424
expected_url = 'file:///dir/subdir'
3425
expected_location = '/dir/subdir'
3426
matcher = config.LocationMatcher(store, expected_url)
3427
self.assertEquals(expected_location, matcher.location)
3430
class TestStartingPathMatcher(TestStore):
3433
super(TestStartingPathMatcher, self).setUp()
3434
# Any simple store is good enough
3435
self.store = config.IniFileStore()
3437
def assertSectionIDs(self, expected, location, content):
3438
self.store._load_from_string(content)
3439
matcher = config.StartingPathMatcher(self.store, location)
3440
sections = list(matcher.get_sections())
3441
self.assertLength(len(expected), sections)
3442
self.assertEqual(expected, [section.id for _, section in sections])
3445
def test_empty(self):
3446
self.assertSectionIDs([], self.get_url(), '')
3448
def test_url_vs_local_paths(self):
3449
# The matcher location is an url and the section names are local paths
3450
sections = self.assertSectionIDs(['/foo/bar', '/foo'],
3451
'file:///foo/bar/baz', '''\
3456
def test_local_path_vs_url(self):
3457
# The matcher location is a local path and the section names are urls
3458
sections = self.assertSectionIDs(['file:///foo/bar', 'file:///foo'],
3459
'/foo/bar/baz', '''\
3465
def test_no_name_section_included_when_present(self):
3466
# Note that other tests will cover the case where the no-name section
3467
# is empty and as such, not included.
3468
sections = self.assertSectionIDs(['/foo/bar', '/foo', None],
3469
'/foo/bar/baz', '''\
3470
option = defined so the no-name section exists
3474
self.assertEquals(['baz', 'bar/baz', '/foo/bar/baz'],
3475
[s.locals['relpath'] for _, s in sections])
3477
def test_order_reversed(self):
3478
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3483
def test_unrelated_section_excluded(self):
3484
self.assertSectionIDs(['/foo/bar', '/foo'], '/foo/bar/baz', '''\
3490
def test_glob_included(self):
3491
sections = self.assertSectionIDs(['/foo/*/baz', '/foo/b*', '/foo'],
3492
'/foo/bar/baz', '''\
3498
# Note that 'baz' as a relpath for /foo/b* is not fully correct, but
3499
# nothing really is... as far using {relpath} to append it to something
3500
# else, this seems good enough though.
3501
self.assertEquals(['', 'baz', 'bar/baz'],
3502
[s.locals['relpath'] for _, s in sections])
3504
def test_respect_order(self):
3505
self.assertSectionIDs(['/foo', '/foo/b*', '/foo/*/baz'],
3506
'/foo/bar/baz', '''\
3514
class TestNameMatcher(TestStore):
3517
super(TestNameMatcher, self).setUp()
3518
self.matcher = config.NameMatcher
3519
# Any simple store is good enough
3520
self.get_store = config.test_store_builder_registry.get('configobj')
3522
def get_matching_sections(self, name):
3523
store = self.get_store(self)
3524
store._load_from_string('''
3532
matcher = self.matcher(store, name)
3533
return list(matcher.get_sections())
3535
def test_matching(self):
3536
sections = self.get_matching_sections('foo')
3537
self.assertLength(1, sections)
3538
self.assertSectionContent(('foo', {'option': 'foo'}), sections[0])
3540
def test_not_matching(self):
3541
sections = self.get_matching_sections('baz')
3542
self.assertLength(0, sections)
3545
class TestBaseStackGet(tests.TestCase):
3548
super(TestBaseStackGet, self).setUp()
3549
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3551
def test_get_first_definition(self):
3552
store1 = config.IniFileStore()
3553
store1._load_from_string('foo=bar')
3554
store2 = config.IniFileStore()
3555
store2._load_from_string('foo=baz')
3556
conf = config.Stack([store1.get_sections, store2.get_sections])
3557
self.assertEquals('bar', conf.get('foo'))
3559
def test_get_with_registered_default_value(self):
3560
config.option_registry.register(config.Option('foo', default='bar'))
3561
conf_stack = config.Stack([])
3562
self.assertEquals('bar', conf_stack.get('foo'))
3564
def test_get_without_registered_default_value(self):
3565
config.option_registry.register(config.Option('foo'))
3566
conf_stack = config.Stack([])
3567
self.assertEquals(None, conf_stack.get('foo'))
3569
def test_get_without_default_value_for_not_registered(self):
3570
conf_stack = config.Stack([])
3571
self.assertEquals(None, conf_stack.get('foo'))
3573
def test_get_for_empty_section_callable(self):
3574
conf_stack = config.Stack([lambda : []])
3575
self.assertEquals(None, conf_stack.get('foo'))
3577
def test_get_for_broken_callable(self):
3578
# Trying to use and invalid callable raises an exception on first use
3579
conf_stack = config.Stack([object])
3580
self.assertRaises(TypeError, conf_stack.get, 'foo')
3583
class TestStackWithSimpleStore(tests.TestCase):
3586
super(TestStackWithSimpleStore, self).setUp()
3587
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3588
self.registry = config.option_registry
3590
def get_conf(self, content=None):
3591
return config.MemoryStack(content)
3593
def test_override_value_from_env(self):
3594
self.registry.register(
3595
config.Option('foo', default='bar', override_from_env=['FOO']))
3596
self.overrideEnv('FOO', 'quux')
3597
# Env variable provides a default taking over the option one
3598
conf = self.get_conf('foo=store')
3599
self.assertEquals('quux', conf.get('foo'))
3601
def test_first_override_value_from_env_wins(self):
3602
self.registry.register(
3603
config.Option('foo', default='bar',
3604
override_from_env=['NO_VALUE', 'FOO', 'BAZ']))
3605
self.overrideEnv('FOO', 'foo')
3606
self.overrideEnv('BAZ', 'baz')
3607
# The first env var set wins
3608
conf = self.get_conf('foo=store')
3609
self.assertEquals('foo', conf.get('foo'))
3612
class TestMemoryStack(tests.TestCase):
3615
conf = config.MemoryStack('foo=bar')
3616
self.assertEquals('bar', conf.get('foo'))
3619
conf = config.MemoryStack('foo=bar')
3620
conf.set('foo', 'baz')
3621
self.assertEquals('baz', conf.get('foo'))
3623
def test_no_content(self):
3624
conf = config.MemoryStack()
3625
# No content means no loading
3626
self.assertFalse(conf.store.is_loaded())
3627
self.assertRaises(NotImplementedError, conf.get, 'foo')
3628
# But a content can still be provided
3629
conf.store._load_from_string('foo=bar')
3630
self.assertEquals('bar', conf.get('foo'))
3633
class TestStackWithTransport(tests.TestCaseWithTransport):
3635
scenarios = [(key, {'get_stack': builder}) for key, builder
3636
in config.test_stack_builder_registry.iteritems()]
3639
class TestConcreteStacks(TestStackWithTransport):
3641
def test_build_stack(self):
3642
# Just a smoke test to help debug builders
3643
stack = self.get_stack(self)
3646
class TestStackGet(TestStackWithTransport):
3649
super(TestStackGet, self).setUp()
3650
self.conf = self.get_stack(self)
3652
def test_get_for_empty_stack(self):
3653
self.assertEquals(None, self.conf.get('foo'))
3655
def test_get_hook(self):
3656
self.conf.set('foo', 'bar')
3660
config.ConfigHooks.install_named_hook('get', hook, None)
3661
self.assertLength(0, calls)
3662
value = self.conf.get('foo')
3663
self.assertEquals('bar', value)
3664
self.assertLength(1, calls)
3665
self.assertEquals((self.conf, 'foo', 'bar'), calls[0])
3668
class TestStackGetWithConverter(tests.TestCase):
3671
super(TestStackGetWithConverter, self).setUp()
3672
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3673
self.registry = config.option_registry
3675
def get_conf(self, content=None):
3676
return config.MemoryStack(content)
3678
def register_bool_option(self, name, default=None, default_from_env=None):
3679
b = config.Option(name, help='A boolean.',
3680
default=default, default_from_env=default_from_env,
3681
from_unicode=config.bool_from_store)
3682
self.registry.register(b)
3684
def test_get_default_bool_None(self):
3685
self.register_bool_option('foo')
3686
conf = self.get_conf('')
3687
self.assertEquals(None, conf.get('foo'))
3689
def test_get_default_bool_True(self):
3690
self.register_bool_option('foo', u'True')
3691
conf = self.get_conf('')
3692
self.assertEquals(True, conf.get('foo'))
3694
def test_get_default_bool_False(self):
3695
self.register_bool_option('foo', False)
3696
conf = self.get_conf('')
3697
self.assertEquals(False, conf.get('foo'))
3699
def test_get_default_bool_False_as_string(self):
3700
self.register_bool_option('foo', u'False')
3701
conf = self.get_conf('')
3702
self.assertEquals(False, conf.get('foo'))
3704
def test_get_default_bool_from_env_converted(self):
3705
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3706
self.overrideEnv('FOO', 'False')
3707
conf = self.get_conf('')
3708
self.assertEquals(False, conf.get('foo'))
3710
def test_get_default_bool_when_conversion_fails(self):
3711
self.register_bool_option('foo', default='True')
3712
conf = self.get_conf('foo=invalid boolean')
3713
self.assertEquals(True, conf.get('foo'))
3715
def register_integer_option(self, name,
3716
default=None, default_from_env=None):
3717
i = config.Option(name, help='An integer.',
3718
default=default, default_from_env=default_from_env,
3719
from_unicode=config.int_from_store)
3720
self.registry.register(i)
3722
def test_get_default_integer_None(self):
3723
self.register_integer_option('foo')
3724
conf = self.get_conf('')
3725
self.assertEquals(None, conf.get('foo'))
3727
def test_get_default_integer(self):
3728
self.register_integer_option('foo', 42)
3729
conf = self.get_conf('')
3730
self.assertEquals(42, conf.get('foo'))
3732
def test_get_default_integer_as_string(self):
3733
self.register_integer_option('foo', u'42')
3734
conf = self.get_conf('')
3735
self.assertEquals(42, conf.get('foo'))
3737
def test_get_default_integer_from_env(self):
3738
self.register_integer_option('foo', default_from_env=['FOO'])
3739
self.overrideEnv('FOO', '18')
3740
conf = self.get_conf('')
3741
self.assertEquals(18, conf.get('foo'))
3743
def test_get_default_integer_when_conversion_fails(self):
3744
self.register_integer_option('foo', default='12')
3745
conf = self.get_conf('foo=invalid integer')
3746
self.assertEquals(12, conf.get('foo'))
3748
def register_list_option(self, name, default=None, default_from_env=None):
3749
l = config.ListOption(name, help='A list.', default=default,
3750
default_from_env=default_from_env)
3751
self.registry.register(l)
3753
def test_get_default_list_None(self):
3754
self.register_list_option('foo')
3755
conf = self.get_conf('')
3756
self.assertEquals(None, conf.get('foo'))
3758
def test_get_default_list_empty(self):
3759
self.register_list_option('foo', '')
3760
conf = self.get_conf('')
3761
self.assertEquals([], conf.get('foo'))
3763
def test_get_default_list_from_env(self):
3764
self.register_list_option('foo', default_from_env=['FOO'])
3765
self.overrideEnv('FOO', '')
3766
conf = self.get_conf('')
3767
self.assertEquals([], conf.get('foo'))
3769
def test_get_with_list_converter_no_item(self):
3770
self.register_list_option('foo', None)
3771
conf = self.get_conf('foo=,')
3772
self.assertEquals([], conf.get('foo'))
3774
def test_get_with_list_converter_many_items(self):
3775
self.register_list_option('foo', None)
3776
conf = self.get_conf('foo=m,o,r,e')
3777
self.assertEquals(['m', 'o', 'r', 'e'], conf.get('foo'))
3779
def test_get_with_list_converter_embedded_spaces_many_items(self):
3780
self.register_list_option('foo', None)
3781
conf = self.get_conf('foo=" bar", "baz "')
3782
self.assertEquals([' bar', 'baz '], conf.get('foo'))
3784
def test_get_with_list_converter_stripped_spaces_many_items(self):
3785
self.register_list_option('foo', None)
3786
conf = self.get_conf('foo= bar , baz ')
3787
self.assertEquals(['bar', 'baz'], conf.get('foo'))
3790
class TestIterOptionRefs(tests.TestCase):
3791
"""iter_option_refs is a bit unusual, document some cases."""
3793
def assertRefs(self, expected, string):
3794
self.assertEquals(expected, list(config.iter_option_refs(string)))
3796
def test_empty(self):
3797
self.assertRefs([(False, '')], '')
3799
def test_no_refs(self):
3800
self.assertRefs([(False, 'foo bar')], 'foo bar')
3802
def test_single_ref(self):
3803
self.assertRefs([(False, ''), (True, '{foo}'), (False, '')], '{foo}')
3805
def test_broken_ref(self):
3806
self.assertRefs([(False, '{foo')], '{foo')
3808
def test_embedded_ref(self):
3809
self.assertRefs([(False, '{'), (True, '{foo}'), (False, '}')],
3812
def test_two_refs(self):
3813
self.assertRefs([(False, ''), (True, '{foo}'),
3814
(False, ''), (True, '{bar}'),
3818
def test_newline_in_refs_are_not_matched(self):
3819
self.assertRefs([(False, '{\nxx}{xx\n}{{\n}}')], '{\nxx}{xx\n}{{\n}}')
3822
class TestStackExpandOptions(tests.TestCaseWithTransport):
3825
super(TestStackExpandOptions, self).setUp()
3826
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3827
self.registry = config.option_registry
3828
self.conf = build_branch_stack(self)
3830
def assertExpansion(self, expected, string, env=None):
3831
self.assertEquals(expected, self.conf.expand_options(string, env))
3833
def test_no_expansion(self):
3834
self.assertExpansion('foo', 'foo')
3836
def test_expand_default_value(self):
3837
self.conf.store._load_from_string('bar=baz')
3838
self.registry.register(config.Option('foo', default=u'{bar}'))
3839
self.assertEquals('baz', self.conf.get('foo', expand=True))
3841
def test_expand_default_from_env(self):
3842
self.conf.store._load_from_string('bar=baz')
3843
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3844
self.overrideEnv('FOO', '{bar}')
3845
self.assertEquals('baz', self.conf.get('foo', expand=True))
3847
def test_expand_default_on_failed_conversion(self):
3848
self.conf.store._load_from_string('baz=bogus\nbar=42\nfoo={baz}')
3849
self.registry.register(
3850
config.Option('foo', default=u'{bar}',
3851
from_unicode=config.int_from_store))
3852
self.assertEquals(42, self.conf.get('foo', expand=True))
3854
def test_env_adding_options(self):
3855
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3857
def test_env_overriding_options(self):
3858
self.conf.store._load_from_string('foo=baz')
3859
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3861
def test_simple_ref(self):
3862
self.conf.store._load_from_string('foo=xxx')
3863
self.assertExpansion('xxx', '{foo}')
3865
def test_unknown_ref(self):
3866
self.assertRaises(errors.ExpandingUnknownOption,
3867
self.conf.expand_options, '{foo}')
3869
def test_indirect_ref(self):
3870
self.conf.store._load_from_string('''
3874
self.assertExpansion('xxx', '{bar}')
3876
def test_embedded_ref(self):
3877
self.conf.store._load_from_string('''
3881
self.assertExpansion('xxx', '{{bar}}')
3883
def test_simple_loop(self):
3884
self.conf.store._load_from_string('foo={foo}')
3885
self.assertRaises(errors.OptionExpansionLoop,
3886
self.conf.expand_options, '{foo}')
3888
def test_indirect_loop(self):
3889
self.conf.store._load_from_string('''
3893
e = self.assertRaises(errors.OptionExpansionLoop,
3894
self.conf.expand_options, '{foo}')
3895
self.assertEquals('foo->bar->baz', e.refs)
3896
self.assertEquals('{foo}', e.string)
3898
def test_list(self):
3899
self.conf.store._load_from_string('''
3903
list={foo},{bar},{baz}
3905
self.registry.register(
3906
config.ListOption('list'))
3907
self.assertEquals(['start', 'middle', 'end'],
3908
self.conf.get('list', expand=True))
3910
def test_cascading_list(self):
3911
self.conf.store._load_from_string('''
3917
self.registry.register(
3918
config.ListOption('list'))
3919
self.assertEquals(['start', 'middle', 'end'],
3920
self.conf.get('list', expand=True))
3922
def test_pathologically_hidden_list(self):
3923
self.conf.store._load_from_string('''
3929
hidden={start}{middle}{end}
3931
# What matters is what the registration says, the conversion happens
3932
# only after all expansions have been performed
3933
self.registry.register(config.ListOption('hidden'))
3934
self.assertEquals(['bin', 'go'],
3935
self.conf.get('hidden', expand=True))
3938
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3941
super(TestStackCrossSectionsExpand, self).setUp()
3943
def get_config(self, location, string):
3946
# Since we don't save the config we won't strictly require to inherit
3947
# from TestCaseInTempDir, but an error occurs so quickly...
3948
c = config.LocationStack(location)
3949
c.store._load_from_string(string)
3952
def test_dont_cross_unrelated_section(self):
3953
c = self.get_config('/another/branch/path','''
3958
[/another/branch/path]
3961
self.assertRaises(errors.ExpandingUnknownOption,
3962
c.get, 'bar', expand=True)
3964
def test_cross_related_sections(self):
3965
c = self.get_config('/project/branch/path','''
3969
[/project/branch/path]
3972
self.assertEquals('quux', c.get('bar', expand=True))
3975
class TestStackCrossStoresExpand(tests.TestCaseWithTransport):
3977
def test_cross_global_locations(self):
3978
l_store = config.LocationStore()
3979
l_store._load_from_string('''
3985
g_store = config.GlobalStore()
3986
g_store._load_from_string('''
3992
stack = config.LocationStack('/branch')
3993
self.assertEquals('glob-bar', stack.get('lbar', expand=True))
3994
self.assertEquals('loc-foo', stack.get('gfoo', expand=True))
3997
class TestStackExpandSectionLocals(tests.TestCaseWithTransport):
3999
def test_expand_locals_empty(self):
4000
l_store = config.LocationStore()
4001
l_store._load_from_string('''
4002
[/home/user/project]
4007
stack = config.LocationStack('/home/user/project/')
4008
self.assertEquals('', stack.get('base', expand=True))
4009
self.assertEquals('', stack.get('rel', expand=True))
4011
def test_expand_basename_locally(self):
4012
l_store = config.LocationStore()
4013
l_store._load_from_string('''
4014
[/home/user/project]
4018
stack = config.LocationStack('/home/user/project/branch')
4019
self.assertEquals('branch', stack.get('bfoo', expand=True))
4021
def test_expand_basename_locally_longer_path(self):
4022
l_store = config.LocationStore()
4023
l_store._load_from_string('''
4028
stack = config.LocationStack('/home/user/project/dir/branch')
4029
self.assertEquals('branch', stack.get('bfoo', expand=True))
4031
def test_expand_relpath_locally(self):
4032
l_store = config.LocationStore()
4033
l_store._load_from_string('''
4034
[/home/user/project]
4035
lfoo = loc-foo/{relpath}
4038
stack = config.LocationStack('/home/user/project/branch')
4039
self.assertEquals('loc-foo/branch', stack.get('lfoo', expand=True))
4041
def test_expand_relpath_unknonw_in_global(self):
4042
g_store = config.GlobalStore()
4043
g_store._load_from_string('''
4048
stack = config.LocationStack('/home/user/project/branch')
4049
self.assertRaises(errors.ExpandingUnknownOption,
4050
stack.get, 'gfoo', expand=True)
4052
def test_expand_local_option_locally(self):
4053
l_store = config.LocationStore()
4054
l_store._load_from_string('''
4055
[/home/user/project]
4056
lfoo = loc-foo/{relpath}
4060
g_store = config.GlobalStore()
4061
g_store._load_from_string('''
4067
stack = config.LocationStack('/home/user/project/branch')
4068
self.assertEquals('glob-bar', stack.get('lbar', expand=True))
4069
self.assertEquals('loc-foo/branch', stack.get('gfoo', expand=True))
4071
def test_locals_dont_leak(self):
4072
"""Make sure we chose the right local in presence of several sections.
4074
l_store = config.LocationStore()
4075
l_store._load_from_string('''
4077
lfoo = loc-foo/{relpath}
4078
[/home/user/project]
4079
lfoo = loc-foo/{relpath}
4082
stack = config.LocationStack('/home/user/project/branch')
4083
self.assertEquals('loc-foo/branch', stack.get('lfoo', expand=True))
4084
stack = config.LocationStack('/home/user/bar/baz')
4085
self.assertEquals('loc-foo/bar/baz', stack.get('lfoo', expand=True))
4089
class TestStackSet(TestStackWithTransport):
4091
def test_simple_set(self):
4092
conf = self.get_stack(self)
4093
self.assertEquals(None, conf.get('foo'))
4094
conf.set('foo', 'baz')
4095
# Did we get it back ?
4096
self.assertEquals('baz', conf.get('foo'))
4098
def test_set_creates_a_new_section(self):
4099
conf = self.get_stack(self)
4100
conf.set('foo', 'baz')
4101
self.assertEquals, 'baz', conf.get('foo')
4103
def test_set_hook(self):
4107
config.ConfigHooks.install_named_hook('set', hook, None)
4108
self.assertLength(0, calls)
4109
conf = self.get_stack(self)
4110
conf.set('foo', 'bar')
4111
self.assertLength(1, calls)
4112
self.assertEquals((conf, 'foo', 'bar'), calls[0])
4115
class TestStackRemove(TestStackWithTransport):
4117
def test_remove_existing(self):
4118
conf = self.get_stack(self)
4119
conf.set('foo', 'bar')
4120
self.assertEquals('bar', conf.get('foo'))
4122
# Did we get it back ?
4123
self.assertEquals(None, conf.get('foo'))
4125
def test_remove_unknown(self):
4126
conf = self.get_stack(self)
4127
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
4129
def test_remove_hook(self):
4133
config.ConfigHooks.install_named_hook('remove', hook, None)
4134
self.assertLength(0, calls)
4135
conf = self.get_stack(self)
4136
conf.set('foo', 'bar')
4138
self.assertLength(1, calls)
4139
self.assertEquals((conf, 'foo'), calls[0])
4142
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
4145
super(TestConfigGetOptions, self).setUp()
4146
create_configs(self)
4148
def test_no_variable(self):
4149
# Using branch should query branch, locations and bazaar
4150
self.assertOptions([], self.branch_config)
4152
def test_option_in_bazaar(self):
4153
self.bazaar_config.set_user_option('file', 'bazaar')
4154
self.assertOptions([('file', 'bazaar', 'DEFAULT', 'bazaar')],
4157
def test_option_in_locations(self):
4158
self.locations_config.set_user_option('file', 'locations')
4160
[('file', 'locations', self.tree.basedir, 'locations')],
4161
self.locations_config)
4163
def test_option_in_branch(self):
4164
self.branch_config.set_user_option('file', 'branch')
4165
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
4168
def test_option_in_bazaar_and_branch(self):
4169
self.bazaar_config.set_user_option('file', 'bazaar')
4170
self.branch_config.set_user_option('file', 'branch')
4171
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
4172
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4175
def test_option_in_branch_and_locations(self):
4176
# Hmm, locations override branch :-/
4177
self.locations_config.set_user_option('file', 'locations')
4178
self.branch_config.set_user_option('file', 'branch')
4180
[('file', 'locations', self.tree.basedir, 'locations'),
4181
('file', 'branch', 'DEFAULT', 'branch'),],
4184
def test_option_in_bazaar_locations_and_branch(self):
4185
self.bazaar_config.set_user_option('file', 'bazaar')
4186
self.locations_config.set_user_option('file', 'locations')
4187
self.branch_config.set_user_option('file', 'branch')
4189
[('file', 'locations', self.tree.basedir, 'locations'),
4190
('file', 'branch', 'DEFAULT', 'branch'),
4191
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4195
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
4198
super(TestConfigRemoveOption, self).setUp()
4199
create_configs_with_file_option(self)
4201
def test_remove_in_locations(self):
4202
self.locations_config.remove_user_option('file', self.tree.basedir)
4204
[('file', 'branch', 'DEFAULT', 'branch'),
4205
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4208
def test_remove_in_branch(self):
4209
self.branch_config.remove_user_option('file')
4211
[('file', 'locations', self.tree.basedir, 'locations'),
4212
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
4215
def test_remove_in_bazaar(self):
4216
self.bazaar_config.remove_user_option('file')
4218
[('file', 'locations', self.tree.basedir, 'locations'),
4219
('file', 'branch', 'DEFAULT', 'branch'),],
4223
class TestConfigGetSections(tests.TestCaseWithTransport):
4226
super(TestConfigGetSections, self).setUp()
4227
create_configs(self)
4229
def assertSectionNames(self, expected, conf, name=None):
4230
"""Check which sections are returned for a given config.
4232
If fallback configurations exist their sections can be included.
4234
:param expected: A list of section names.
4236
:param conf: The configuration that will be queried.
4238
:param name: An optional section name that will be passed to
4241
sections = list(conf._get_sections(name))
4242
self.assertLength(len(expected), sections)
4243
self.assertEqual(expected, [name for name, _, _ in sections])
4245
def test_bazaar_default_section(self):
4246
self.assertSectionNames(['DEFAULT'], self.bazaar_config)
4248
def test_locations_default_section(self):
4249
# No sections are defined in an empty file
4250
self.assertSectionNames([], self.locations_config)
4252
def test_locations_named_section(self):
4253
self.locations_config.set_user_option('file', 'locations')
4254
self.assertSectionNames([self.tree.basedir], self.locations_config)
4256
def test_locations_matching_sections(self):
4257
loc_config = self.locations_config
4258
loc_config.set_user_option('file', 'locations')
4259
# We need to cheat a bit here to create an option in sections above and
4260
# below the 'location' one.
4261
parser = loc_config._get_parser()
4262
# locations.cong deals with '/' ignoring native os.sep
4263
location_names = self.tree.basedir.split('/')
4264
parent = '/'.join(location_names[:-1])
4265
child = '/'.join(location_names + ['child'])
4267
parser[parent]['file'] = 'parent'
4269
parser[child]['file'] = 'child'
4270
self.assertSectionNames([self.tree.basedir, parent], loc_config)
4272
def test_branch_data_default_section(self):
4273
self.assertSectionNames([None],
4274
self.branch_config._get_branch_data_config())
4276
def test_branch_default_sections(self):
4277
# No sections are defined in an empty locations file
4278
self.assertSectionNames([None, 'DEFAULT'],
4280
# Unless we define an option
4281
self.branch_config._get_location_config().set_user_option(
4282
'file', 'locations')
4283
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
4286
def test_bazaar_named_section(self):
4287
# We need to cheat as the API doesn't give direct access to sections
4288
# other than DEFAULT.
4289
self.bazaar_config.set_alias('bazaar', 'bzr')
4290
self.assertSectionNames(['ALIASES'], self.bazaar_config, 'ALIASES')
4293
class TestAuthenticationConfigFile(tests.TestCase):
4294
"""Test the authentication.conf file matching"""
4296
def _got_user_passwd(self, expected_user, expected_password,
4297
config, *args, **kwargs):
4298
credentials = config.get_credentials(*args, **kwargs)
4299
if credentials is None:
4303
user = credentials['user']
4304
password = credentials['password']
4305
self.assertEquals(expected_user, user)
4306
self.assertEquals(expected_password, password)
4308
def test_empty_config(self):
4309
conf = config.AuthenticationConfig(_file=StringIO())
4310
self.assertEquals({}, conf._get_config())
4311
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4313
def test_non_utf8_config(self):
4314
conf = config.AuthenticationConfig(_file=StringIO(
4316
self.assertRaises(errors.ConfigContentError, conf._get_config)
4318
def test_missing_auth_section_header(self):
4319
conf = config.AuthenticationConfig(_file=StringIO('foo = bar'))
4320
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4322
def test_auth_section_header_not_closed(self):
4323
conf = config.AuthenticationConfig(_file=StringIO('[DEF'))
4324
self.assertRaises(errors.ParseConfigError, conf._get_config)
4326
def test_auth_value_not_boolean(self):
4327
conf = config.AuthenticationConfig(_file=StringIO(
4331
verify_certificates=askme # Error: Not a boolean
4333
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4335
def test_auth_value_not_int(self):
4336
conf = config.AuthenticationConfig(_file=StringIO(
4340
port=port # Error: Not an int
4342
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
4344
def test_unknown_password_encoding(self):
4345
conf = config.AuthenticationConfig(_file=StringIO(
4349
password_encoding=unknown
4351
self.assertRaises(ValueError, conf.get_password,
4352
'ftp', 'foo.net', 'joe')
4354
def test_credentials_for_scheme_host(self):
4355
conf = config.AuthenticationConfig(_file=StringIO(
4356
"""# Identity on foo.net
4361
password=secret-pass
4364
self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
4366
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
4368
self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
4370
def test_credentials_for_host_port(self):
4371
conf = config.AuthenticationConfig(_file=StringIO(
4372
"""# Identity on foo.net
4378
password=secret-pass
4381
self._got_user_passwd('joe', 'secret-pass',
4382
conf, 'ftp', 'foo.net', port=10021)
4384
self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
4386
def test_for_matching_host(self):
4387
conf = config.AuthenticationConfig(_file=StringIO(
4388
"""# Identity on foo.net
4394
[sourceforge domain]
4401
self._got_user_passwd('georges', 'bendover',
4402
conf, 'bzr', 'foo.bzr.sf.net')
4404
self._got_user_passwd(None, None,
4405
conf, 'bzr', 'bbzr.sf.net')
4407
def test_for_matching_host_None(self):
4408
conf = config.AuthenticationConfig(_file=StringIO(
4409
"""# Identity on foo.net
4419
self._got_user_passwd('joe', 'joepass',
4420
conf, 'bzr', 'quux.net')
4421
# no host but different scheme
4422
self._got_user_passwd('georges', 'bendover',
4423
conf, 'ftp', 'quux.net')
4425
def test_credentials_for_path(self):
4426
conf = config.AuthenticationConfig(_file=StringIO(
4442
self._got_user_passwd(None, None,
4443
conf, 'http', host='bar.org', path='/dir3')
4445
self._got_user_passwd('georges', 'bendover',
4446
conf, 'http', host='bar.org', path='/dir2')
4448
self._got_user_passwd('jim', 'jimpass',
4449
conf, 'http', host='bar.org',path='/dir1/subdir')
4451
def test_credentials_for_user(self):
4452
conf = config.AuthenticationConfig(_file=StringIO(
4461
self._got_user_passwd('jim', 'jimpass',
4462
conf, 'http', 'bar.org')
4464
self._got_user_passwd('jim', 'jimpass',
4465
conf, 'http', 'bar.org', user='jim')
4466
# Don't get a different user if one is specified
4467
self._got_user_passwd(None, None,
4468
conf, 'http', 'bar.org', user='georges')
4470
def test_credentials_for_user_without_password(self):
4471
conf = config.AuthenticationConfig(_file=StringIO(
4478
# Get user but no password
4479
self._got_user_passwd('jim', None,
4480
conf, 'http', 'bar.org')
4482
def test_verify_certificates(self):
4483
conf = config.AuthenticationConfig(_file=StringIO(
4490
verify_certificates=False
4497
credentials = conf.get_credentials('https', 'bar.org')
4498
self.assertEquals(False, credentials.get('verify_certificates'))
4499
credentials = conf.get_credentials('https', 'foo.net')
4500
self.assertEquals(True, credentials.get('verify_certificates'))
4503
class TestAuthenticationStorage(tests.TestCaseInTempDir):
4505
def test_set_credentials(self):
4506
conf = config.AuthenticationConfig()
4507
conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
4508
99, path='/foo', verify_certificates=False, realm='realm')
4509
credentials = conf.get_credentials(host='host', scheme='scheme',
4510
port=99, path='/foo',
4512
CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
4513
'verify_certificates': False, 'scheme': 'scheme',
4514
'host': 'host', 'port': 99, 'path': '/foo',
4516
self.assertEqual(CREDENTIALS, credentials)
4517
credentials_from_disk = config.AuthenticationConfig().get_credentials(
4518
host='host', scheme='scheme', port=99, path='/foo', realm='realm')
4519
self.assertEqual(CREDENTIALS, credentials_from_disk)
4521
def test_reset_credentials_different_name(self):
4522
conf = config.AuthenticationConfig()
4523
conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
4524
conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
4525
self.assertIs(None, conf._get_config().get('name'))
4526
credentials = conf.get_credentials(host='host', scheme='scheme')
4527
CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
4528
'password', 'verify_certificates': True,
4529
'scheme': 'scheme', 'host': 'host', 'port': None,
4530
'path': None, 'realm': None}
4531
self.assertEqual(CREDENTIALS, credentials)
4534
class TestAuthenticationConfig(tests.TestCase):
4535
"""Test AuthenticationConfig behaviour"""
4537
def _check_default_password_prompt(self, expected_prompt_format, scheme,
4538
host=None, port=None, realm=None,
4542
user, password = 'jim', 'precious'
4543
expected_prompt = expected_prompt_format % {
4544
'scheme': scheme, 'host': host, 'port': port,
4545
'user': user, 'realm': realm}
4547
stdout = tests.StringIOWrapper()
4548
stderr = tests.StringIOWrapper()
4549
ui.ui_factory = tests.TestUIFactory(stdin=password + '\n',
4550
stdout=stdout, stderr=stderr)
4551
# We use an empty conf so that the user is always prompted
4552
conf = config.AuthenticationConfig()
4553
self.assertEquals(password,
4554
conf.get_password(scheme, host, user, port=port,
4555
realm=realm, path=path))
4556
self.assertEquals(expected_prompt, stderr.getvalue())
4557
self.assertEquals('', stdout.getvalue())
4559
def _check_default_username_prompt(self, expected_prompt_format, scheme,
4560
host=None, port=None, realm=None,
4565
expected_prompt = expected_prompt_format % {
4566
'scheme': scheme, 'host': host, 'port': port,
4568
stdout = tests.StringIOWrapper()
4569
stderr = tests.StringIOWrapper()
4570
ui.ui_factory = tests.TestUIFactory(stdin=username+ '\n',
4571
stdout=stdout, stderr=stderr)
4572
# We use an empty conf so that the user is always prompted
4573
conf = config.AuthenticationConfig()
4574
self.assertEquals(username, conf.get_user(scheme, host, port=port,
4575
realm=realm, path=path, ask=True))
4576
self.assertEquals(expected_prompt, stderr.getvalue())
4577
self.assertEquals('', stdout.getvalue())
4579
def test_username_defaults_prompts(self):
4580
# HTTP prompts can't be tested here, see test_http.py
4581
self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
4582
self._check_default_username_prompt(
4583
u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
4584
self._check_default_username_prompt(
4585
u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
4587
def test_username_default_no_prompt(self):
4588
conf = config.AuthenticationConfig()
4589
self.assertEquals(None,
4590
conf.get_user('ftp', 'example.com'))
4591
self.assertEquals("explicitdefault",
4592
conf.get_user('ftp', 'example.com', default="explicitdefault"))
4594
def test_password_default_prompts(self):
4595
# HTTP prompts can't be tested here, see test_http.py
4596
self._check_default_password_prompt(
4597
u'FTP %(user)s@%(host)s password: ', 'ftp')
4598
self._check_default_password_prompt(
4599
u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
4600
self._check_default_password_prompt(
4601
u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
4602
# SMTP port handling is a bit special (it's handled if embedded in the
4604
# FIXME: should we: forbid that, extend it to other schemes, leave
4605
# things as they are that's fine thank you ?
4606
self._check_default_password_prompt(
4607
u'SMTP %(user)s@%(host)s password: ', 'smtp')
4608
self._check_default_password_prompt(
4609
u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
4610
self._check_default_password_prompt(
4611
u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
4613
def test_ssh_password_emits_warning(self):
4614
conf = config.AuthenticationConfig(_file=StringIO(
4622
entered_password = 'typed-by-hand'
4623
stdout = tests.StringIOWrapper()
4624
stderr = tests.StringIOWrapper()
4625
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n',
4626
stdout=stdout, stderr=stderr)
4628
# Since the password defined in the authentication config is ignored,
4629
# the user is prompted
4630
self.assertEquals(entered_password,
4631
conf.get_password('ssh', 'bar.org', user='jim'))
4632
self.assertContainsRe(
4634
'password ignored in section \[ssh with password\]')
4636
def test_ssh_without_password_doesnt_emit_warning(self):
4637
conf = config.AuthenticationConfig(_file=StringIO(
4644
entered_password = 'typed-by-hand'
4645
stdout = tests.StringIOWrapper()
4646
stderr = tests.StringIOWrapper()
4647
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n',
4651
# Since the password defined in the authentication config is ignored,
4652
# the user is prompted
4653
self.assertEquals(entered_password,
4654
conf.get_password('ssh', 'bar.org', user='jim'))
4655
# No warning shoud be emitted since there is no password. We are only
4657
self.assertNotContainsRe(
4659
'password ignored in section \[ssh with password\]')
4661
def test_uses_fallback_stores(self):
4662
self.overrideAttr(config, 'credential_store_registry',
4663
config.CredentialStoreRegistry())
4664
store = StubCredentialStore()
4665
store.add_credentials("http", "example.com", "joe", "secret")
4666
config.credential_store_registry.register("stub", store, fallback=True)
4667
conf = config.AuthenticationConfig(_file=StringIO())
4668
creds = conf.get_credentials("http", "example.com")
4669
self.assertEquals("joe", creds["user"])
4670
self.assertEquals("secret", creds["password"])
4673
class StubCredentialStore(config.CredentialStore):
4679
def add_credentials(self, scheme, host, user, password=None):
4680
self._username[(scheme, host)] = user
4681
self._password[(scheme, host)] = password
4683
def get_credentials(self, scheme, host, port=None, user=None,
4684
path=None, realm=None):
4685
key = (scheme, host)
4686
if not key in self._username:
4688
return { "scheme": scheme, "host": host, "port": port,
4689
"user": self._username[key], "password": self._password[key]}
4692
class CountingCredentialStore(config.CredentialStore):
4697
def get_credentials(self, scheme, host, port=None, user=None,
4698
path=None, realm=None):
4703
class TestCredentialStoreRegistry(tests.TestCase):
4705
def _get_cs_registry(self):
4706
return config.credential_store_registry
4708
def test_default_credential_store(self):
4709
r = self._get_cs_registry()
4710
default = r.get_credential_store(None)
4711
self.assertIsInstance(default, config.PlainTextCredentialStore)
4713
def test_unknown_credential_store(self):
4714
r = self._get_cs_registry()
4715
# It's hard to imagine someone creating a credential store named
4716
# 'unknown' so we use that as an never registered key.
4717
self.assertRaises(KeyError, r.get_credential_store, 'unknown')
4719
def test_fallback_none_registered(self):
4720
r = config.CredentialStoreRegistry()
4721
self.assertEquals(None,
4722
r.get_fallback_credentials("http", "example.com"))
4724
def test_register(self):
4725
r = config.CredentialStoreRegistry()
4726
r.register("stub", StubCredentialStore(), fallback=False)
4727
r.register("another", StubCredentialStore(), fallback=True)
4728
self.assertEquals(["another", "stub"], r.keys())
4730
def test_register_lazy(self):
4731
r = config.CredentialStoreRegistry()
4732
r.register_lazy("stub", "bzrlib.tests.test_config",
4733
"StubCredentialStore", fallback=False)
4734
self.assertEquals(["stub"], r.keys())
4735
self.assertIsInstance(r.get_credential_store("stub"),
4736
StubCredentialStore)
4738
def test_is_fallback(self):
4739
r = config.CredentialStoreRegistry()
4740
r.register("stub1", None, fallback=False)
4741
r.register("stub2", None, fallback=True)
4742
self.assertEquals(False, r.is_fallback("stub1"))
4743
self.assertEquals(True, r.is_fallback("stub2"))
4745
def test_no_fallback(self):
4746
r = config.CredentialStoreRegistry()
4747
store = CountingCredentialStore()
4748
r.register("count", store, fallback=False)
4749
self.assertEquals(None,
4750
r.get_fallback_credentials("http", "example.com"))
4751
self.assertEquals(0, store._calls)
4753
def test_fallback_credentials(self):
4754
r = config.CredentialStoreRegistry()
4755
store = StubCredentialStore()
4756
store.add_credentials("http", "example.com",
4757
"somebody", "geheim")
4758
r.register("stub", store, fallback=True)
4759
creds = r.get_fallback_credentials("http", "example.com")
4760
self.assertEquals("somebody", creds["user"])
4761
self.assertEquals("geheim", creds["password"])
4763
def test_fallback_first_wins(self):
4764
r = config.CredentialStoreRegistry()
4765
stub1 = StubCredentialStore()
4766
stub1.add_credentials("http", "example.com",
4767
"somebody", "stub1")
4768
r.register("stub1", stub1, fallback=True)
4769
stub2 = StubCredentialStore()
4770
stub2.add_credentials("http", "example.com",
4771
"somebody", "stub2")
4772
r.register("stub2", stub1, fallback=True)
4773
creds = r.get_fallback_credentials("http", "example.com")
4774
self.assertEquals("somebody", creds["user"])
4775
self.assertEquals("stub1", creds["password"])
4778
class TestPlainTextCredentialStore(tests.TestCase):
4780
def test_decode_password(self):
4781
r = config.credential_store_registry
4782
plain_text = r.get_credential_store()
4783
decoded = plain_text.decode_password(dict(password='secret'))
4784
self.assertEquals('secret', decoded)
4787
# FIXME: Once we have a way to declare authentication to all test servers, we
4788
# can implement generic tests.
4789
# test_user_password_in_url
4790
# test_user_in_url_password_from_config
4791
# test_user_in_url_password_prompted
4792
# test_user_in_config
4793
# test_user_getpass.getuser
4794
# test_user_prompted ?
4795
class TestAuthenticationRing(tests.TestCaseWithTransport):
4799
class TestAutoUserId(tests.TestCase):
4800
"""Test inferring an automatic user name."""
4802
def test_auto_user_id(self):
4803
"""Automatic inference of user name.
4805
This is a bit hard to test in an isolated way, because it depends on
4806
system functions that go direct to /etc or perhaps somewhere else.
4807
But it's reasonable to say that on Unix, with an /etc/mailname, we ought
4808
to be able to choose a user name with no configuration.
4810
if sys.platform == 'win32':
4811
raise tests.TestSkipped(
4812
"User name inference not implemented on win32")
4813
realname, address = config._auto_user_id()
4814
if os.path.exists('/etc/mailname'):
4815
self.assertIsNot(None, realname)
4816
self.assertIsNot(None, address)
4818
self.assertEquals((None, None), (realname, address))
4821
class EmailOptionTests(tests.TestCase):
4823
def test_default_email_uses_BZR_EMAIL(self):
4824
conf = config.MemoryStack('email=jelmer@debian.org')
4825
# BZR_EMAIL takes precedence over EMAIL
4826
self.overrideEnv('BZR_EMAIL', 'jelmer@samba.org')
4827
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4828
self.assertEquals('jelmer@samba.org', conf.get('email'))
4830
def test_default_email_uses_EMAIL(self):
4831
conf = config.MemoryStack('')
4832
self.overrideEnv('BZR_EMAIL', None)
4833
self.overrideEnv('EMAIL', 'jelmer@apache.org')
4834
self.assertEquals('jelmer@apache.org', conf.get('email'))
4836
def test_BZR_EMAIL_overrides(self):
4837
conf = config.MemoryStack('email=jelmer@debian.org')
4838
self.overrideEnv('BZR_EMAIL', 'jelmer@apache.org')
4839
self.assertEquals('jelmer@apache.org', conf.get('email'))
4840
self.overrideEnv('BZR_EMAIL', None)
4841
self.overrideEnv('EMAIL', 'jelmer@samba.org')
4842
self.assertEquals('jelmer@debian.org', conf.get('email'))