64
66
load_tests = scenarios.load_tests_apply_scenarios
66
# We need adapters that can build a config store in a test context. Test
67
# classes, based on TestCaseWithTransport, can use the registry to parametrize
68
# themselves. The builder will receive a test instance and should return a
69
# ready-to-use store. Plugins that defines new stores can also register
70
# themselves here to be tested against the tests defined below.
72
# FIXME: plugins should *not* need to import test_config to register their
73
# helpers (or selftest -s xxx will be broken), the following registry should be
74
# moved to bzrlib.config instead so that selftest -s bt.test_config also runs
75
# the plugin specific tests (selftest -s bp.xxx won't, that would be against
76
# the spirit of '-s') -- vila 20110503
77
test_store_builder_registry = registry.Registry()
78
test_store_builder_registry.register(
68
# Register helpers to build stores
69
config.test_store_builder_registry.register(
79
70
'configobj', lambda test: config.IniFileStore(test.get_transport(),
81
test_store_builder_registry.register(
72
config.test_store_builder_registry.register(
82
73
'bazaar', lambda test: config.GlobalStore())
83
test_store_builder_registry.register(
74
config.test_store_builder_registry.register(
84
75
'location', lambda test: config.LocationStore())
85
test_store_builder_registry.register(
86
'branch', lambda test: config.BranchStore(test.branch))
78
def build_backing_branch(test, relpath,
79
transport_class=None, server_class=None):
80
"""Test helper to create a backing branch only once.
82
Some tests needs multiple stores/stacks to check concurrent update
83
behaviours. As such, they need to build different branch *objects* even if
84
they share the branch on disk.
86
:param relpath: The relative path to the branch. (Note that the helper
87
should always specify the same relpath).
89
:param transport_class: The Transport class the test needs to use.
91
:param server_class: The server associated with the ``transport_class``
94
Either both or neither of ``transport_class`` and ``server_class`` should
97
if transport_class is not None and server_class is not None:
98
test.transport_class = transport_class
99
test.transport_server = server_class
100
elif not (transport_class is None and server_class is None):
101
raise AssertionError('Specify both ``transport_class`` and '
102
'``server_class`` or neither of them')
103
if getattr(test, 'backing_branch', None) is None:
104
# First call, let's build the branch on disk
105
test.backing_branch = test.make_branch(relpath)
108
def build_branch_store(test):
109
build_backing_branch(test, 'branch')
110
b = branch.Branch.open('branch')
111
return config.BranchStore(b)
112
config.test_store_builder_registry.register('branch', build_branch_store)
115
def build_control_store(test):
116
build_backing_branch(test, 'branch')
117
b = bzrdir.BzrDir.open('branch')
118
return config.ControlStore(b)
119
config.test_store_builder_registry.register('control', build_control_store)
122
def build_remote_branch_store(test):
123
# There is only one permutation (but we won't be able to handle more with
124
# this design anyway)
126
server_class) = transport_remote.get_test_permutations()[0]
127
build_backing_branch(test, 'branch', transport_class, server_class)
128
b = branch.Branch.open(test.get_url('branch'))
129
return config.BranchStore(b)
130
config.test_store_builder_registry.register('remote_branch',
131
build_remote_branch_store)
134
config.test_stack_builder_registry.register(
135
'bazaar', lambda test: config.GlobalStack())
136
config.test_stack_builder_registry.register(
137
'location', lambda test: config.LocationStack('.'))
140
def build_branch_stack(test):
141
build_backing_branch(test, 'branch')
142
b = branch.Branch.open('branch')
143
return config.BranchStack(b)
144
config.test_stack_builder_registry.register('branch', build_branch_stack)
147
def build_remote_branch_stack(test):
148
# There is only one permutation (but we won't be able to handle more with
149
# this design anyway)
151
server_class) = transport_remote.get_test_permutations()[0]
152
build_backing_branch(test, 'branch', transport_class, server_class)
153
b = branch.Branch.open(test.get_url('branch'))
154
return config.RemoteBranchStack(b)
155
config.test_stack_builder_registry.register('remote_branch',
156
build_remote_branch_stack)
158
def build_remote_control_stack(test):
159
# There is only one permutation (but we won't be able to handle more with
160
# this design anyway)
162
server_class) = transport_remote.get_test_permutations()[0]
163
# We need only a bzrdir for this, not a full branch, but it's not worth
164
# creating a dedicated helper to create only the bzrdir
165
build_backing_branch(test, 'branch', transport_class, server_class)
166
b = branch.Branch.open(test.get_url('branch'))
167
return config.RemoteControlStack(b.bzrdir)
168
config.test_stack_builder_registry.register('remote_control',
169
build_remote_control_stack)
90
172
sample_long_alias="log -r-15..-1 --line"
1838
2028
self.assertIs(None, bzrdir_config.get_default_stack_on())
2031
class TestOldConfigHooks(tests.TestCaseWithTransport):
2034
super(TestOldConfigHooks, self).setUp()
2035
create_configs_with_file_option(self)
2037
def assertGetHook(self, conf, name, value):
2041
config.OldConfigHooks.install_named_hook('get', hook, None)
2043
config.OldConfigHooks.uninstall_named_hook, 'get', None)
2044
self.assertLength(0, calls)
2045
actual_value = conf.get_user_option(name)
2046
self.assertEquals(value, actual_value)
2047
self.assertLength(1, calls)
2048
self.assertEquals((conf, name, value), calls[0])
2050
def test_get_hook_bazaar(self):
2051
self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
2053
def test_get_hook_locations(self):
2054
self.assertGetHook(self.locations_config, 'file', 'locations')
2056
def test_get_hook_branch(self):
2057
# Since locations masks branch, we define a different option
2058
self.branch_config.set_user_option('file2', 'branch')
2059
self.assertGetHook(self.branch_config, 'file2', 'branch')
2061
def assertSetHook(self, conf, name, value):
2065
config.OldConfigHooks.install_named_hook('set', hook, None)
2067
config.OldConfigHooks.uninstall_named_hook, 'set', None)
2068
self.assertLength(0, calls)
2069
conf.set_user_option(name, value)
2070
self.assertLength(1, calls)
2071
# We can't assert the conf object below as different configs use
2072
# different means to implement set_user_option and we care only about
2074
self.assertEquals((name, value), calls[0][1:])
2076
def test_set_hook_bazaar(self):
2077
self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
2079
def test_set_hook_locations(self):
2080
self.assertSetHook(self.locations_config, 'foo', 'locations')
2082
def test_set_hook_branch(self):
2083
self.assertSetHook(self.branch_config, 'foo', 'branch')
2085
def assertRemoveHook(self, conf, name, section_name=None):
2089
config.OldConfigHooks.install_named_hook('remove', hook, None)
2091
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
2092
self.assertLength(0, calls)
2093
conf.remove_user_option(name, section_name)
2094
self.assertLength(1, calls)
2095
# We can't assert the conf object below as different configs use
2096
# different means to implement remove_user_option and we care only about
2098
self.assertEquals((name,), calls[0][1:])
2100
def test_remove_hook_bazaar(self):
2101
self.assertRemoveHook(self.bazaar_config, 'file')
2103
def test_remove_hook_locations(self):
2104
self.assertRemoveHook(self.locations_config, 'file',
2105
self.locations_config.location)
2107
def test_remove_hook_branch(self):
2108
self.assertRemoveHook(self.branch_config, 'file')
2110
def assertLoadHook(self, name, conf_class, *conf_args):
2114
config.OldConfigHooks.install_named_hook('load', hook, None)
2116
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2117
self.assertLength(0, calls)
2119
conf = conf_class(*conf_args)
2120
# Access an option to trigger a load
2121
conf.get_user_option(name)
2122
self.assertLength(1, calls)
2123
# Since we can't assert about conf, we just use the number of calls ;-/
2125
def test_load_hook_bazaar(self):
2126
self.assertLoadHook('file', config.GlobalConfig)
2128
def test_load_hook_locations(self):
2129
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
2131
def test_load_hook_branch(self):
2132
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
2134
def assertSaveHook(self, conf):
2138
config.OldConfigHooks.install_named_hook('save', hook, None)
2140
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2141
self.assertLength(0, calls)
2142
# Setting an option triggers a save
2143
conf.set_user_option('foo', 'bar')
2144
self.assertLength(1, calls)
2145
# Since we can't assert about conf, we just use the number of calls ;-/
2147
def test_save_hook_bazaar(self):
2148
self.assertSaveHook(self.bazaar_config)
2150
def test_save_hook_locations(self):
2151
self.assertSaveHook(self.locations_config)
2153
def test_save_hook_branch(self):
2154
self.assertSaveHook(self.branch_config)
2157
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
2158
"""Tests config hooks for remote configs.
2160
No tests for the remove hook as this is not implemented there.
2164
super(TestOldConfigHooksForRemote, self).setUp()
2165
self.transport_server = test_server.SmartTCPServer_for_testing
2166
create_configs_with_file_option(self)
2168
def assertGetHook(self, conf, name, value):
2172
config.OldConfigHooks.install_named_hook('get', hook, None)
2174
config.OldConfigHooks.uninstall_named_hook, 'get', None)
2175
self.assertLength(0, calls)
2176
actual_value = conf.get_option(name)
2177
self.assertEquals(value, actual_value)
2178
self.assertLength(1, calls)
2179
self.assertEquals((conf, name, value), calls[0])
2181
def test_get_hook_remote_branch(self):
2182
remote_branch = branch.Branch.open(self.get_url('tree'))
2183
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
2185
def test_get_hook_remote_bzrdir(self):
2186
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2187
conf = remote_bzrdir._get_config()
2188
conf.set_option('remotedir', 'file')
2189
self.assertGetHook(conf, 'file', 'remotedir')
2191
def assertSetHook(self, conf, name, value):
2195
config.OldConfigHooks.install_named_hook('set', hook, None)
2197
config.OldConfigHooks.uninstall_named_hook, 'set', None)
2198
self.assertLength(0, calls)
2199
conf.set_option(value, name)
2200
self.assertLength(1, calls)
2201
# We can't assert the conf object below as different configs use
2202
# different means to implement set_user_option and we care only about
2204
self.assertEquals((name, value), calls[0][1:])
2206
def test_set_hook_remote_branch(self):
2207
remote_branch = branch.Branch.open(self.get_url('tree'))
2208
self.addCleanup(remote_branch.lock_write().unlock)
2209
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
2211
def test_set_hook_remote_bzrdir(self):
2212
remote_branch = branch.Branch.open(self.get_url('tree'))
2213
self.addCleanup(remote_branch.lock_write().unlock)
2214
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2215
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
2217
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
2221
config.OldConfigHooks.install_named_hook('load', hook, None)
2223
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2224
self.assertLength(0, calls)
2226
conf = conf_class(*conf_args)
2227
# Access an option to trigger a load
2228
conf.get_option(name)
2229
self.assertLength(expected_nb_calls, calls)
2230
# Since we can't assert about conf, we just use the number of calls ;-/
2232
def test_load_hook_remote_branch(self):
2233
remote_branch = branch.Branch.open(self.get_url('tree'))
2234
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
2236
def test_load_hook_remote_bzrdir(self):
2237
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2238
# The config file doesn't exist, set an option to force its creation
2239
conf = remote_bzrdir._get_config()
2240
conf.set_option('remotedir', 'file')
2241
# We get one call for the server and one call for the client, this is
2242
# caused by the differences in implementations betwen
2243
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
2244
# SmartServerBranchGetConfigFile (in smart/branch.py)
2245
self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
2247
def assertSaveHook(self, conf):
2251
config.OldConfigHooks.install_named_hook('save', hook, None)
2253
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2254
self.assertLength(0, calls)
2255
# Setting an option triggers a save
2256
conf.set_option('foo', 'bar')
2257
self.assertLength(1, calls)
2258
# Since we can't assert about conf, we just use the number of calls ;-/
2260
def test_save_hook_remote_branch(self):
2261
remote_branch = branch.Branch.open(self.get_url('tree'))
2262
self.addCleanup(remote_branch.lock_write().unlock)
2263
self.assertSaveHook(remote_branch._get_config())
2265
def test_save_hook_remote_bzrdir(self):
2266
remote_branch = branch.Branch.open(self.get_url('tree'))
2267
self.addCleanup(remote_branch.lock_write().unlock)
2268
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2269
self.assertSaveHook(remote_bzrdir._get_config())
2272
class TestOption(tests.TestCase):
2274
def test_default_value(self):
2275
opt = config.Option('foo', default='bar')
2276
self.assertEquals('bar', opt.get_default())
2278
def test_default_value_from_env(self):
2279
opt = config.Option('foo', default='bar', default_from_env=['FOO'])
2280
self.overrideEnv('FOO', 'quux')
2281
# Env variable provides a default taking over the option one
2282
self.assertEquals('quux', opt.get_default())
2284
def test_first_default_value_from_env_wins(self):
2285
opt = config.Option('foo', default='bar',
2286
default_from_env=['NO_VALUE', 'FOO', 'BAZ'])
2287
self.overrideEnv('FOO', 'foo')
2288
self.overrideEnv('BAZ', 'baz')
2289
# The first env var set wins
2290
self.assertEquals('foo', opt.get_default())
2292
def test_not_supported_list_default_value(self):
2293
self.assertRaises(AssertionError, config.Option, 'foo', default=[1])
2295
def test_not_supported_object_default_value(self):
2296
self.assertRaises(AssertionError, config.Option, 'foo',
2300
class TestOptionConverterMixin(object):
2302
def assertConverted(self, expected, opt, value):
2303
self.assertEquals(expected, opt.convert_from_unicode(value))
2305
def assertWarns(self, opt, value):
2308
warnings.append(args[0] % args[1:])
2309
self.overrideAttr(trace, 'warning', warning)
2310
self.assertEquals(None, opt.convert_from_unicode(value))
2311
self.assertLength(1, warnings)
2313
'Value "%s" is not valid for "%s"' % (value, opt.name),
2316
def assertErrors(self, opt, value):
2317
self.assertRaises(errors.ConfigOptionValueError,
2318
opt.convert_from_unicode, value)
2320
def assertConvertInvalid(self, opt, invalid_value):
2322
self.assertEquals(None, opt.convert_from_unicode(invalid_value))
2323
opt.invalid = 'warning'
2324
self.assertWarns(opt, invalid_value)
2325
opt.invalid = 'error'
2326
self.assertErrors(opt, invalid_value)
2329
class TestOptionWithBooleanConverter(tests.TestCase, TestOptionConverterMixin):
2331
def get_option(self):
2332
return config.Option('foo', help='A boolean.',
2333
from_unicode=config.bool_from_store)
2335
def test_convert_invalid(self):
2336
opt = self.get_option()
2337
# A string that is not recognized as a boolean
2338
self.assertConvertInvalid(opt, u'invalid-boolean')
2339
# A list of strings is never recognized as a boolean
2340
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2342
def test_convert_valid(self):
2343
opt = self.get_option()
2344
self.assertConverted(True, opt, u'True')
2345
self.assertConverted(True, opt, u'1')
2346
self.assertConverted(False, opt, u'False')
2349
class TestOptionWithIntegerConverter(tests.TestCase, TestOptionConverterMixin):
2351
def get_option(self):
2352
return config.Option('foo', help='An integer.',
2353
from_unicode=config.int_from_store)
2355
def test_convert_invalid(self):
2356
opt = self.get_option()
2357
# A string that is not recognized as an integer
2358
self.assertConvertInvalid(opt, u'forty-two')
2359
# A list of strings is never recognized as an integer
2360
self.assertConvertInvalid(opt, [u'a', u'list'])
2362
def test_convert_valid(self):
2363
opt = self.get_option()
2364
self.assertConverted(16, opt, u'16')
2366
class TestOptionWithListConverter(tests.TestCase, TestOptionConverterMixin):
2368
def get_option(self):
2369
return config.Option('foo', help='A list.',
2370
from_unicode=config.list_from_store)
2372
def test_convert_invalid(self):
2373
# No string is invalid as all forms can be converted to a list
2376
def test_convert_valid(self):
2377
opt = self.get_option()
2378
# An empty string is an empty list
2379
self.assertConverted([], opt, '') # Using a bare str() just in case
2380
self.assertConverted([], opt, u'')
2382
self.assertConverted([u'True'], opt, u'True')
2384
self.assertConverted([u'42'], opt, u'42')
2386
self.assertConverted([u'bar'], opt, u'bar')
2387
# A list remains a list (configObj will turn a string containing commas
2388
# into a list, but that's not what we're testing here)
2389
self.assertConverted([u'foo', u'1', u'True'],
2390
opt, [u'foo', u'1', u'True'])
2393
class TestOptionConverterMixin(object):
2395
def assertConverted(self, expected, opt, value):
2396
self.assertEquals(expected, opt.convert_from_unicode(value))
2398
def assertWarns(self, opt, value):
2401
warnings.append(args[0] % args[1:])
2402
self.overrideAttr(trace, 'warning', warning)
2403
self.assertEquals(None, opt.convert_from_unicode(value))
2404
self.assertLength(1, warnings)
2406
'Value "%s" is not valid for "%s"' % (value, opt.name),
2409
def assertErrors(self, opt, value):
2410
self.assertRaises(errors.ConfigOptionValueError,
2411
opt.convert_from_unicode, value)
2413
def assertConvertInvalid(self, opt, invalid_value):
2415
self.assertEquals(None, opt.convert_from_unicode(invalid_value))
2416
opt.invalid = 'warning'
2417
self.assertWarns(opt, invalid_value)
2418
opt.invalid = 'error'
2419
self.assertErrors(opt, invalid_value)
2422
class TestOptionWithBooleanConverter(tests.TestCase, TestOptionConverterMixin):
2424
def get_option(self):
2425
return config.Option('foo', help='A boolean.',
2426
from_unicode=config.bool_from_store)
2428
def test_convert_invalid(self):
2429
opt = self.get_option()
2430
# A string that is not recognized as a boolean
2431
self.assertConvertInvalid(opt, u'invalid-boolean')
2432
# A list of strings is never recognized as a boolean
2433
self.assertConvertInvalid(opt, [u'not', u'a', u'boolean'])
2435
def test_convert_valid(self):
2436
opt = self.get_option()
2437
self.assertConverted(True, opt, u'True')
2438
self.assertConverted(True, opt, u'1')
2439
self.assertConverted(False, opt, u'False')
2442
class TestOptionWithIntegerConverter(tests.TestCase, TestOptionConverterMixin):
2444
def get_option(self):
2445
return config.Option('foo', help='An integer.',
2446
from_unicode=config.int_from_store)
2448
def test_convert_invalid(self):
2449
opt = self.get_option()
2450
# A string that is not recognized as an integer
2451
self.assertConvertInvalid(opt, u'forty-two')
2452
# A list of strings is never recognized as an integer
2453
self.assertConvertInvalid(opt, [u'a', u'list'])
2455
def test_convert_valid(self):
2456
opt = self.get_option()
2457
self.assertConverted(16, opt, u'16')
2460
class TestOptionWithListConverter(tests.TestCase, TestOptionConverterMixin):
2462
def get_option(self):
2463
return config.Option('foo', help='A list.',
2464
from_unicode=config.list_from_store)
2466
def test_convert_invalid(self):
2467
opt = self.get_option()
2468
# We don't even try to convert a list into a list, we only expect
2470
self.assertConvertInvalid(opt, [1])
2471
# No string is invalid as all forms can be converted to a list
2473
def test_convert_valid(self):
2474
opt = self.get_option()
2475
# An empty string is an empty list
2476
self.assertConverted([], opt, '') # Using a bare str() just in case
2477
self.assertConverted([], opt, u'')
2479
self.assertConverted([u'True'], opt, u'True')
2481
self.assertConverted([u'42'], opt, u'42')
2483
self.assertConverted([u'bar'], opt, u'bar')
2486
class TestOptionRegistry(tests.TestCase):
2489
super(TestOptionRegistry, self).setUp()
2490
# Always start with an empty registry
2491
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
2492
self.registry = config.option_registry
2494
def test_register(self):
2495
opt = config.Option('foo')
2496
self.registry.register(opt)
2497
self.assertIs(opt, self.registry.get('foo'))
2499
def test_registered_help(self):
2500
opt = config.Option('foo', help='A simple option')
2501
self.registry.register(opt)
2502
self.assertEquals('A simple option', self.registry.get_help('foo'))
2504
lazy_option = config.Option('lazy_foo', help='Lazy help')
2506
def test_register_lazy(self):
2507
self.registry.register_lazy('lazy_foo', self.__module__,
2508
'TestOptionRegistry.lazy_option')
2509
self.assertIs(self.lazy_option, self.registry.get('lazy_foo'))
2511
def test_registered_lazy_help(self):
2512
self.registry.register_lazy('lazy_foo', self.__module__,
2513
'TestOptionRegistry.lazy_option')
2514
self.assertEquals('Lazy help', self.registry.get_help('lazy_foo'))
2517
class TestRegisteredOptions(tests.TestCase):
2518
"""All registered options should verify some constraints."""
2520
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2521
in config.option_registry.iteritems()]
2524
super(TestRegisteredOptions, self).setUp()
2525
self.registry = config.option_registry
2527
def test_proper_name(self):
2528
# An option should be registered under its own name, this can't be
2529
# checked at registration time for the lazy ones.
2530
self.assertEquals(self.option_name, self.option.name)
2532
def test_help_is_set(self):
2533
option_help = self.registry.get_help(self.option_name)
2534
self.assertNotEquals(None, option_help)
2535
# Come on, think about the user, he really wants to know what the
2537
self.assertIsNot(None, option_help)
2538
self.assertNotEquals('', option_help)
1841
2541
class TestSection(tests.TestCase):
1843
2543
# FIXME: Parametrize so that all sections produced by Stores run these
1961
2657
self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2660
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2661
"""Simulate loading a config store with content of various encodings.
2663
All files produced by bzr are in utf8 content.
2665
Users may modify them manually and end up with a file that can't be
2666
loaded. We need to issue proper error messages in this case.
2669
invalid_utf8_char = '\xff'
2671
def test_load_utf8(self):
2672
"""Ensure we can load an utf8-encoded file."""
2673
t = self.get_transport()
2674
# From http://pad.lv/799212
2675
unicode_user = u'b\N{Euro Sign}ar'
2676
unicode_content = u'user=%s' % (unicode_user,)
2677
utf8_content = unicode_content.encode('utf8')
2678
# Store the raw content in the config file
2679
t.put_bytes('foo.conf', utf8_content)
2680
store = config.IniFileStore(t, 'foo.conf')
2682
stack = config.Stack([store.get_sections], store)
2683
self.assertEquals(unicode_user, stack.get('user'))
2685
def test_load_non_ascii(self):
2686
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2687
t = self.get_transport()
2688
t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2689
store = config.IniFileStore(t, 'foo.conf')
2690
self.assertRaises(errors.ConfigContentError, store.load)
2692
def test_load_erroneous_content(self):
2693
"""Ensure we display a proper error on content that can't be parsed."""
2694
t = self.get_transport()
2695
t.put_bytes('foo.conf', '[open_section\n')
2696
store = config.IniFileStore(t, 'foo.conf')
2697
self.assertRaises(errors.ParseConfigError, store.load)
2699
def test_load_permission_denied(self):
2700
"""Ensure we get warned when trying to load an inaccessible file."""
2703
warnings.append(args[0] % args[1:])
2704
self.overrideAttr(trace, 'warning', warning)
2706
t = self.get_transport()
2708
def get_bytes(relpath):
2709
raise errors.PermissionDenied(relpath, "")
2710
t.get_bytes = get_bytes
2711
store = config.IniFileStore(t, 'foo.conf')
2712
self.assertRaises(errors.PermissionDenied, store.load)
2715
[u'Permission denied while trying to load configuration store %s.'
2716
% store.external_url()])
2719
class TestIniConfigContent(tests.TestCaseWithTransport):
2720
"""Simulate loading a IniBasedConfig with content of various encodings.
2722
All files produced by bzr are in utf8 content.
2724
Users may modify them manually and end up with a file that can't be
2725
loaded. We need to issue proper error messages in this case.
2728
invalid_utf8_char = '\xff'
2730
def test_load_utf8(self):
2731
"""Ensure we can load an utf8-encoded file."""
2732
# From http://pad.lv/799212
2733
unicode_user = u'b\N{Euro Sign}ar'
2734
unicode_content = u'user=%s' % (unicode_user,)
2735
utf8_content = unicode_content.encode('utf8')
2736
# Store the raw content in the config file
2737
with open('foo.conf', 'wb') as f:
2738
f.write(utf8_content)
2739
conf = config.IniBasedConfig(file_name='foo.conf')
2740
self.assertEquals(unicode_user, conf.get_user_option('user'))
2742
def test_load_badly_encoded_content(self):
2743
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2744
with open('foo.conf', 'wb') as f:
2745
f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2746
conf = config.IniBasedConfig(file_name='foo.conf')
2747
self.assertRaises(errors.ConfigContentError, conf._get_parser)
2749
def test_load_erroneous_content(self):
2750
"""Ensure we display a proper error on content that can't be parsed."""
2751
with open('foo.conf', 'wb') as f:
2752
f.write('[open_section\n')
2753
conf = config.IniBasedConfig(file_name='foo.conf')
2754
self.assertRaises(errors.ParseConfigError, conf._get_parser)
1964
2757
class TestMutableStore(TestStore):
1966
scenarios = [(key, {'store_id': key, 'get_store': builder})
1967
for key, builder in test_store_builder_registry.iteritems()]
2759
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2760
in config.test_store_builder_registry.iteritems()]
1969
2762
def setUp(self):
1970
2763
super(TestMutableStore, self).setUp()
1971
2764
self.transport = self.get_transport()
1972
self.branch = self.make_branch('branch')
1974
2766
def has_store(self, store):
1975
2767
store_basename = urlutils.relative_url(self.transport.external_url(),
2095
2923
class TestLockableIniFileStore(TestStore):
2097
2925
def test_create_store_in_created_dir(self):
2926
self.assertPathDoesNotExist('dir')
2098
2927
t = self.get_transport('dir/subdir')
2099
2928
store = config.LockableIniFileStore(t, 'foo.conf')
2100
2929
store.get_mutable_section(None).set('foo', 'bar')
2103
# FIXME: We should adapt the tests in TestLockableConfig about concurrent
2104
# writes. Since this requires a clearer rewrite, I'll just rely on using
2105
# the same code in LockableIniFileStore (copied from LockableConfig, but
2106
# trivial enough, the main difference is that we add @needs_write_lock on
2107
# save() instead of set_user_option() and remove_user_option()). The intent
2108
# is to ensure that we always get a valid content for the store even when
2109
# concurrent accesses occur, read/write, write/write. It may be worth
2110
# looking into removing the lock dir when it;s not needed anymore and look
2111
# at possible fallouts for concurrent lockers -- vila 20110-04-06
2931
self.assertPathExists('dir/subdir')
2934
class TestConcurrentStoreUpdates(TestStore):
2935
"""Test that Stores properly handle conccurent updates.
2937
New Store implementation may fail some of these tests but until such
2938
implementations exist it's hard to properly filter them from the scenarios
2939
applied here. If you encounter such a case, contact the bzr devs.
2942
scenarios = [(key, {'get_stack': builder}) for key, builder
2943
in config.test_stack_builder_registry.iteritems()]
2946
super(TestConcurrentStoreUpdates, self).setUp()
2947
self._content = 'one=1\ntwo=2\n'
2948
self.stack = self.get_stack(self)
2949
if not isinstance(self.stack, config._CompatibleStack):
2950
raise tests.TestNotApplicable(
2951
'%s is not meant to be compatible with the old config design'
2953
self.stack.store._load_from_string(self._content)
2955
self.stack.store.save()
2957
def test_simple_read_access(self):
2958
self.assertEquals('1', self.stack.get('one'))
2960
def test_simple_write_access(self):
2961
self.stack.set('one', 'one')
2962
self.assertEquals('one', self.stack.get('one'))
2964
def test_listen_to_the_last_speaker(self):
2966
c2 = self.get_stack(self)
2967
c1.set('one', 'ONE')
2968
c2.set('two', 'TWO')
2969
self.assertEquals('ONE', c1.get('one'))
2970
self.assertEquals('TWO', c2.get('two'))
2971
# The second update respect the first one
2972
self.assertEquals('ONE', c2.get('one'))
2974
def test_last_speaker_wins(self):
2975
# If the same config is not shared, the same variable modified twice
2976
# can only see a single result.
2978
c2 = self.get_stack(self)
2981
self.assertEquals('c2', c2.get('one'))
2982
# The first modification is still available until another refresh
2984
self.assertEquals('c1', c1.get('one'))
2985
c1.set('two', 'done')
2986
self.assertEquals('c2', c1.get('one'))
2988
def test_writes_are_serialized(self):
2990
c2 = self.get_stack(self)
2992
# We spawn a thread that will pause *during* the config saving.
2993
before_writing = threading.Event()
2994
after_writing = threading.Event()
2995
writing_done = threading.Event()
2996
c1_save_without_locking_orig = c1.store.save_without_locking
2997
def c1_save_without_locking():
2998
before_writing.set()
2999
c1_save_without_locking_orig()
3000
# The lock is held. We wait for the main thread to decide when to
3002
after_writing.wait()
3003
c1.store.save_without_locking = c1_save_without_locking
3007
t1 = threading.Thread(target=c1_set)
3008
# Collect the thread after the test
3009
self.addCleanup(t1.join)
3010
# Be ready to unblock the thread if the test goes wrong
3011
self.addCleanup(after_writing.set)
3013
before_writing.wait()
3014
self.assertRaises(errors.LockContention,
3015
c2.set, 'one', 'c2')
3016
self.assertEquals('c1', c1.get('one'))
3017
# Let the lock be released
3021
self.assertEquals('c2', c2.get('one'))
3023
def test_read_while_writing(self):
3025
# We spawn a thread that will pause *during* the write
3026
ready_to_write = threading.Event()
3027
do_writing = threading.Event()
3028
writing_done = threading.Event()
3029
# We override the _save implementation so we know the store is locked
3030
c1_save_without_locking_orig = c1.store.save_without_locking
3031
def c1_save_without_locking():
3032
ready_to_write.set()
3033
# The lock is held. We wait for the main thread to decide when to
3036
c1_save_without_locking_orig()
3038
c1.store.save_without_locking = c1_save_without_locking
3041
t1 = threading.Thread(target=c1_set)
3042
# Collect the thread after the test
3043
self.addCleanup(t1.join)
3044
# Be ready to unblock the thread if the test goes wrong
3045
self.addCleanup(do_writing.set)
3047
# Ensure the thread is ready to write
3048
ready_to_write.wait()
3049
self.assertEquals('c1', c1.get('one'))
3050
# If we read during the write, we get the old value
3051
c2 = self.get_stack(self)
3052
self.assertEquals('1', c2.get('one'))
3053
# Let the writing occur and ensure it occurred
3056
# Now we get the updated value
3057
c3 = self.get_stack(self)
3058
self.assertEquals('c1', c3.get('one'))
3060
# FIXME: It may be worth looking into removing the lock dir when it's not
3061
# needed anymore and look at possible fallouts for concurrent lockers. This
3062
# will matter if/when we use config files outside of bazaar directories
3063
# (.bazaar or .bzr) -- vila 20110-04-111
2114
3066
class TestSectionMatcher(TestStore):
2116
scenarios = [('location', {'matcher': config.LocationMatcher})]
3068
scenarios = [('location', {'matcher': config.LocationMatcher}),
3069
('id', {'matcher': config.NameMatcher}),]
2118
3071
def get_store(self, file_name):
2119
3072
return config.IniFileStore(self.get_readonly_transport(), file_name)
2215
3265
self.assertRaises(TypeError, conf_stack.get, 'foo')
2218
class TestStackSet(tests.TestCaseWithTransport):
2220
# FIXME: This should be parametrized for all known Stack or dedicated
2221
# paramerized tests created to avoid bloating -- vila 2011-04-05
3268
class TestStackWithTransport(tests.TestCaseWithTransport):
3270
scenarios = [(key, {'get_stack': builder}) for key, builder
3271
in config.test_stack_builder_registry.iteritems()]
3274
class TestConcreteStacks(TestStackWithTransport):
3276
def test_build_stack(self):
3277
# Just a smoke test to help debug builders
3278
stack = self.get_stack(self)
3281
class TestStackGet(TestStackWithTransport):
3284
super(TestStackGet, self).setUp()
3285
self.conf = self.get_stack(self)
3287
def test_get_for_empty_stack(self):
3288
self.assertEquals(None, self.conf.get('foo'))
3290
def test_get_hook(self):
3291
self.conf.store._load_from_string('foo=bar')
3295
config.ConfigHooks.install_named_hook('get', hook, None)
3296
self.assertLength(0, calls)
3297
value = self.conf.get('foo')
3298
self.assertEquals('bar', value)
3299
self.assertLength(1, calls)
3300
self.assertEquals((self.conf, 'foo', 'bar'), calls[0])
3303
class TestStackGetWithConverter(TestStackGet):
3306
super(TestStackGetWithConverter, self).setUp()
3307
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3308
self.registry = config.option_registry
3310
def register_bool_option(self, name, default=None, default_from_env=None):
3311
b = config.Option(name, help='A boolean.',
3312
default=default, default_from_env=default_from_env,
3313
from_unicode=config.bool_from_store)
3314
self.registry.register(b)
3316
def test_get_default_bool_None(self):
3317
self.register_bool_option('foo')
3318
self.assertEquals(None, self.conf.get('foo'))
3320
def test_get_default_bool_True(self):
3321
self.register_bool_option('foo', u'True')
3322
self.assertEquals(True, self.conf.get('foo'))
3324
def test_get_default_bool_False(self):
3325
self.register_bool_option('foo', False)
3326
self.assertEquals(False, self.conf.get('foo'))
3328
def test_get_default_bool_False_as_string(self):
3329
self.register_bool_option('foo', u'False')
3330
self.assertEquals(False, self.conf.get('foo'))
3332
def test_get_default_bool_from_env_converted(self):
3333
self.register_bool_option('foo', u'True', default_from_env=['FOO'])
3334
self.overrideEnv('FOO', 'False')
3335
self.assertEquals(False, self.conf.get('foo'))
3337
def test_get_default_bool_when_conversion_fails(self):
3338
self.register_bool_option('foo', default='True')
3339
self.conf.store._load_from_string('foo=invalid boolean')
3340
self.assertEquals(True, self.conf.get('foo'))
3342
def register_integer_option(self, name,
3343
default=None, default_from_env=None):
3344
i = config.Option(name, help='An integer.',
3345
default=default, default_from_env=default_from_env,
3346
from_unicode=config.int_from_store)
3347
self.registry.register(i)
3349
def test_get_default_integer_None(self):
3350
self.register_integer_option('foo')
3351
self.assertEquals(None, self.conf.get('foo'))
3353
def test_get_default_integer(self):
3354
self.register_integer_option('foo', 42)
3355
self.assertEquals(42, self.conf.get('foo'))
3357
def test_get_default_integer_as_string(self):
3358
self.register_integer_option('foo', u'42')
3359
self.assertEquals(42, self.conf.get('foo'))
3361
def test_get_default_integer_from_env(self):
3362
self.register_integer_option('foo', default_from_env=['FOO'])
3363
self.overrideEnv('FOO', '18')
3364
self.assertEquals(18, self.conf.get('foo'))
3366
def test_get_default_integer_when_conversion_fails(self):
3367
self.register_integer_option('foo', default='12')
3368
self.conf.store._load_from_string('foo=invalid integer')
3369
self.assertEquals(12, self.conf.get('foo'))
3371
def register_list_option(self, name, default=None, default_from_env=None):
3372
l = config.Option(name, help='A list.',
3373
default=default, default_from_env=default_from_env,
3374
from_unicode=config.list_from_store)
3375
self.registry.register(l)
3377
def test_get_default_list_None(self):
3378
self.register_list_option('foo')
3379
self.assertEquals(None, self.conf.get('foo'))
3381
def test_get_default_list_empty(self):
3382
self.register_list_option('foo', '')
3383
self.assertEquals([], self.conf.get('foo'))
3385
def test_get_default_list_from_env(self):
3386
self.register_list_option('foo', default_from_env=['FOO'])
3387
self.overrideEnv('FOO', '')
3388
self.assertEquals([], self.conf.get('foo'))
3390
def test_get_with_list_converter_no_item(self):
3391
self.register_list_option('foo', None)
3392
self.conf.store._load_from_string('foo=,')
3393
self.assertEquals([], self.conf.get('foo'))
3395
def test_get_with_list_converter_many_items(self):
3396
self.register_list_option('foo', None)
3397
self.conf.store._load_from_string('foo=m,o,r,e')
3398
self.assertEquals(['m', 'o', 'r', 'e'], self.conf.get('foo'))
3400
def test_get_with_list_converter_embedded_spaces_many_items(self):
3401
self.register_list_option('foo', None)
3402
self.conf.store._load_from_string('foo=" bar", "baz "')
3403
self.assertEquals([' bar', 'baz '], self.conf.get('foo'))
3405
def test_get_with_list_converter_stripped_spaces_many_items(self):
3406
self.register_list_option('foo', None)
3407
self.conf.store._load_from_string('foo= bar , baz ')
3408
self.assertEquals(['bar', 'baz'], self.conf.get('foo'))
3411
class TestStackExpandOptions(tests.TestCaseWithTransport):
3414
super(TestStackExpandOptions, self).setUp()
3415
self.overrideAttr(config, 'option_registry', config.OptionRegistry())
3416
self.registry = config.option_registry
3417
self.conf = build_branch_stack(self)
3419
def assertExpansion(self, expected, string, env=None):
3420
self.assertEquals(expected, self.conf.expand_options(string, env))
3422
def test_no_expansion(self):
3423
self.assertExpansion('foo', 'foo')
3425
def test_expand_default_value(self):
3426
self.conf.store._load_from_string('bar=baz')
3427
self.registry.register(config.Option('foo', default=u'{bar}'))
3428
self.assertEquals('baz', self.conf.get('foo', expand=True))
3430
def test_expand_default_from_env(self):
3431
self.conf.store._load_from_string('bar=baz')
3432
self.registry.register(config.Option('foo', default_from_env=['FOO']))
3433
self.overrideEnv('FOO', '{bar}')
3434
self.assertEquals('baz', self.conf.get('foo', expand=True))
3436
def test_expand_default_on_failed_conversion(self):
3437
self.conf.store._load_from_string('baz=bogus\nbar=42\nfoo={baz}')
3438
self.registry.register(
3439
config.Option('foo', default=u'{bar}',
3440
from_unicode=config.int_from_store))
3441
self.assertEquals(42, self.conf.get('foo', expand=True))
3443
def test_env_adding_options(self):
3444
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3446
def test_env_overriding_options(self):
3447
self.conf.store._load_from_string('foo=baz')
3448
self.assertExpansion('bar', '{foo}', {'foo': 'bar'})
3450
def test_simple_ref(self):
3451
self.conf.store._load_from_string('foo=xxx')
3452
self.assertExpansion('xxx', '{foo}')
3454
def test_unknown_ref(self):
3455
self.assertRaises(errors.ExpandingUnknownOption,
3456
self.conf.expand_options, '{foo}')
3458
def test_indirect_ref(self):
3459
self.conf.store._load_from_string('''
3463
self.assertExpansion('xxx', '{bar}')
3465
def test_embedded_ref(self):
3466
self.conf.store._load_from_string('''
3470
self.assertExpansion('xxx', '{{bar}}')
3472
def test_simple_loop(self):
3473
self.conf.store._load_from_string('foo={foo}')
3474
self.assertRaises(errors.OptionExpansionLoop,
3475
self.conf.expand_options, '{foo}')
3477
def test_indirect_loop(self):
3478
self.conf.store._load_from_string('''
3482
e = self.assertRaises(errors.OptionExpansionLoop,
3483
self.conf.expand_options, '{foo}')
3484
self.assertEquals('foo->bar->baz', e.refs)
3485
self.assertEquals('{foo}', e.string)
3487
def test_list(self):
3488
self.conf.store._load_from_string('''
3492
list={foo},{bar},{baz}
3494
self.registry.register(
3495
config.Option('list', from_unicode=config.list_from_store))
3496
self.assertEquals(['start', 'middle', 'end'],
3497
self.conf.get('list', expand=True))
3499
def test_cascading_list(self):
3500
self.conf.store._load_from_string('''
3506
self.registry.register(
3507
config.Option('list', from_unicode=config.list_from_store))
3508
self.assertEquals(['start', 'middle', 'end'],
3509
self.conf.get('list', expand=True))
3511
def test_pathologically_hidden_list(self):
3512
self.conf.store._load_from_string('''
3518
hidden={start}{middle}{end}
3520
# What matters is what the registration says, the conversion happens
3521
# only after all expansions have been performed
3522
self.registry.register(
3523
config.Option('hidden', from_unicode=config.list_from_store))
3524
self.assertEquals(['bin', 'go'],
3525
self.conf.get('hidden', expand=True))
3528
class TestStackCrossSectionsExpand(tests.TestCaseWithTransport):
3531
super(TestStackCrossSectionsExpand, self).setUp()
3533
def get_config(self, location, string):
3536
# Since we don't save the config we won't strictly require to inherit
3537
# from TestCaseInTempDir, but an error occurs so quickly...
3538
c = config.LocationStack(location)
3539
c.store._load_from_string(string)
3542
def test_dont_cross_unrelated_section(self):
3543
c = self.get_config('/another/branch/path','''
3548
[/another/branch/path]
3551
self.assertRaises(errors.ExpandingUnknownOption,
3552
c.get, 'bar', expand=True)
3554
def test_cross_related_sections(self):
3555
c = self.get_config('/project/branch/path','''
3559
[/project/branch/path]
3562
self.assertEquals('quux', c.get('bar', expand=True))
3565
class TestStackSet(TestStackWithTransport):
2223
3567
def test_simple_set(self):
2224
store = config.IniFileStore(self.get_transport(), 'test.conf')
2225
store._load_from_string('foo=bar')
2226
conf = config.Stack([store.get_sections], store)
3568
conf = self.get_stack(self)
3569
conf.store._load_from_string('foo=bar')
2227
3570
self.assertEquals('bar', conf.get('foo'))
2228
3571
conf.set('foo', 'baz')
2229
3572
# Did we get it back ?
2230
3573
self.assertEquals('baz', conf.get('foo'))
2232
3575
def test_set_creates_a_new_section(self):
2233
store = config.IniFileStore(self.get_transport(), 'test.conf')
2234
conf = config.Stack([store.get_sections], store)
3576
conf = self.get_stack(self)
2235
3577
conf.set('foo', 'baz')
2236
3578
self.assertEquals, 'baz', conf.get('foo')
2239
class TestStackRemove(tests.TestCaseWithTransport):
2241
# FIXME: This should be parametrized for all known Stack or dedicated
2242
# paramerized tests created to avoid bloating -- vila 2011-04-06
3580
def test_set_hook(self):
3584
config.ConfigHooks.install_named_hook('set', hook, None)
3585
self.assertLength(0, calls)
3586
conf = self.get_stack(self)
3587
conf.set('foo', 'bar')
3588
self.assertLength(1, calls)
3589
self.assertEquals((conf, 'foo', 'bar'), calls[0])
3592
class TestStackRemove(TestStackWithTransport):
2244
3594
def test_remove_existing(self):
2245
store = config.IniFileStore(self.get_transport(), 'test.conf')
2246
store._load_from_string('foo=bar')
2247
conf = config.Stack([store.get_sections], store)
3595
conf = self.get_stack(self)
3596
conf.store._load_from_string('foo=bar')
2248
3597
self.assertEquals('bar', conf.get('foo'))
2249
3598
conf.remove('foo')
2250
3599
# Did we get it back ?
2251
3600
self.assertEquals(None, conf.get('foo'))
2253
3602
def test_remove_unknown(self):
2254
store = config.IniFileStore(self.get_transport(), 'test.conf')
2255
conf = config.Stack([store.get_sections], store)
3603
conf = self.get_stack(self)
2256
3604
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3606
def test_remove_hook(self):
3610
config.ConfigHooks.install_named_hook('remove', hook, None)
3611
self.assertLength(0, calls)
3612
conf = self.get_stack(self)
3613
conf.store._load_from_string('foo=bar')
3615
self.assertLength(1, calls)
3616
self.assertEquals((conf, 'foo'), calls[0])
2259
3619
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):