512
170
my_config = config.Config()
513
171
self.assertEqual(None, my_config.post_commit())
515
def test_log_format_default(self):
516
my_config = config.Config()
517
self.assertEqual('long', my_config.log_format())
519
def test_acceptable_keys_default(self):
520
my_config = config.Config()
521
self.assertEqual(None, my_config.acceptable_keys())
523
def test_validate_signatures_in_log_default(self):
524
my_config = config.Config()
525
self.assertEqual(False, my_config.validate_signatures_in_log())
527
def test_get_change_editor(self):
528
my_config = InstrumentedConfig()
529
change_editor = my_config.get_change_editor('old_tree', 'new_tree')
530
self.assertEqual(['_get_change_editor'], my_config._calls)
531
self.assertIs(diff.DiffFromTool, change_editor.__class__)
532
self.assertEqual(['vimdiff', '-fo', '@new_path', '@old_path'],
533
change_editor.command_template)
536
class TestConfigPath(tests.TestCase):
174
class TestConfigPath(TestCase):
539
177
super(TestConfigPath, self).setUp()
540
self.overrideEnv('HOME', '/home/bogus')
541
self.overrideEnv('XDG_CACHE_DIR', '')
542
if sys.platform == 'win32':
544
'BZR_HOME', r'C:\Documents and Settings\bogus\Application Data')
546
'C:/Documents and Settings/bogus/Application Data/bazaar/2.0'
548
self.bzr_home = '/home/bogus/.bazaar'
178
self.old_home = os.environ.get('HOME', None)
179
self.old_appdata = os.environ.get('APPDATA', None)
180
os.environ['HOME'] = '/home/bogus'
181
os.environ['APPDATA'] = \
182
r'C:\Documents and Settings\bogus\Application Data'
185
if self.old_home is None:
186
del os.environ['HOME']
188
os.environ['HOME'] = self.old_home
189
if self.old_appdata is None:
190
del os.environ['APPDATA']
192
os.environ['APPDATA'] = self.old_appdata
193
super(TestConfigPath, self).tearDown()
550
195
def test_config_dir(self):
551
self.assertEqual(config.config_dir(), self.bzr_home)
196
if sys.platform == 'win32':
197
self.assertEqual(config.config_dir(),
198
r'C:\Documents and Settings\bogus\Application Data\bazaar\2.0')
200
self.assertEqual(config.config_dir(), '/home/bogus/.bazaar')
553
202
def test_config_filename(self):
554
self.assertEqual(config.config_filename(),
555
self.bzr_home + '/bazaar.conf')
557
def test_locations_config_filename(self):
558
self.assertEqual(config.locations_config_filename(),
559
self.bzr_home + '/locations.conf')
561
def test_authentication_config_filename(self):
562
self.assertEqual(config.authentication_config_filename(),
563
self.bzr_home + '/authentication.conf')
565
def test_xdg_cache_dir(self):
566
self.assertEqual(config.xdg_cache_dir(),
567
'/home/bogus/.cache')
570
class TestXDGConfigDir(tests.TestCaseInTempDir):
571
# must be in temp dir because config tests for the existence of the bazaar
572
# subdirectory of $XDG_CONFIG_HOME
575
if sys.platform in ('darwin', 'win32'):
576
raise tests.TestNotApplicable(
577
'XDG config dir not used on this platform')
578
super(TestXDGConfigDir, self).setUp()
579
self.overrideEnv('HOME', self.test_home_dir)
580
# BZR_HOME overrides everything we want to test so unset it.
581
self.overrideEnv('BZR_HOME', None)
583
def test_xdg_config_dir_exists(self):
584
"""When ~/.config/bazaar exists, use it as the config dir."""
585
newdir = osutils.pathjoin(self.test_home_dir, '.config', 'bazaar')
587
self.assertEqual(config.config_dir(), newdir)
589
def test_xdg_config_home(self):
590
"""When XDG_CONFIG_HOME is set, use it."""
591
xdgconfigdir = osutils.pathjoin(self.test_home_dir, 'xdgconfig')
592
self.overrideEnv('XDG_CONFIG_HOME', xdgconfigdir)
593
newdir = osutils.pathjoin(xdgconfigdir, 'bazaar')
595
self.assertEqual(config.config_dir(), newdir)
598
class TestIniConfig(tests.TestCaseInTempDir):
600
def make_config_parser(self, s):
601
conf = config.IniBasedConfig.from_string(s)
602
return conf, conf._get_parser()
605
class TestIniConfigBuilding(TestIniConfig):
203
if sys.platform == 'win32':
204
self.assertEqual(config.config_filename(),
205
r'C:\Documents and Settings\bogus\Application Data\bazaar\2.0\bazaar.conf')
207
self.assertEqual(config.config_filename(),
208
'/home/bogus/.bazaar/bazaar.conf')
210
def test_branches_config_filename(self):
211
if sys.platform == 'win32':
212
self.assertEqual(config.branches_config_filename(),
213
r'C:\Documents and Settings\bogus\Application Data\bazaar\2.0\branches.conf')
215
self.assertEqual(config.branches_config_filename(),
216
'/home/bogus/.bazaar/branches.conf')
218
class TestIniConfig(TestCase):
607
220
def test_contructs(self):
608
my_config = config.IniBasedConfig()
221
my_config = config.IniBasedConfig("nothing")
610
223
def test_from_fp(self):
611
my_config = config.IniBasedConfig.from_string(sample_config_text)
612
self.assertIsInstance(my_config._get_parser(), configobj.ConfigObj)
224
config_file = StringIO(sample_config_text)
225
my_config = config.IniBasedConfig(None)
227
isinstance(my_config._get_parser(file=config_file),
614
230
def test_cached(self):
615
my_config = config.IniBasedConfig.from_string(sample_config_text)
616
parser = my_config._get_parser()
617
self.assertTrue(my_config._get_parser() is parser)
619
def _dummy_chown(self, path, uid, gid):
620
self.path, self.uid, self.gid = path, uid, gid
622
def test_ini_config_ownership(self):
623
"""Ensure that chown is happening during _write_config_file"""
624
self.requireFeature(features.chown_feature)
625
self.overrideAttr(os, 'chown', self._dummy_chown)
626
self.path = self.uid = self.gid = None
627
conf = config.IniBasedConfig(file_name='./foo.conf')
628
conf._write_config_file()
629
self.assertEquals(self.path, './foo.conf')
630
self.assertTrue(isinstance(self.uid, int))
631
self.assertTrue(isinstance(self.gid, int))
633
def test_get_filename_parameter_is_deprecated_(self):
634
conf = self.callDeprecated([
635
'IniBasedConfig.__init__(get_filename) was deprecated in 2.3.'
636
' Use file_name instead.'],
637
config.IniBasedConfig, lambda: 'ini.conf')
638
self.assertEqual('ini.conf', conf.file_name)
640
def test_get_parser_file_parameter_is_deprecated_(self):
641
config_file = StringIO(sample_config_text.encode('utf-8'))
642
conf = config.IniBasedConfig.from_string(sample_config_text)
643
conf = self.callDeprecated([
644
'IniBasedConfig._get_parser(file=xxx) was deprecated in 2.3.'
645
' Use IniBasedConfig(_content=xxx) instead.'],
646
conf._get_parser, file=config_file)
649
class TestIniConfigSaving(tests.TestCaseInTempDir):
651
def test_cant_save_without_a_file_name(self):
652
conf = config.IniBasedConfig()
653
self.assertRaises(AssertionError, conf._write_config_file)
655
def test_saved_with_content(self):
656
content = 'foo = bar\n'
657
conf = config.IniBasedConfig.from_string(
658
content, file_name='./test.conf', save=True)
659
self.assertFileEqual(content, 'test.conf')
662
class TestIniConfigOptionExpansionDefaultValue(tests.TestCaseInTempDir):
663
"""What is the default value of expand for config options.
665
This is an opt-in beta feature used to evaluate whether or not option
666
references can appear in dangerous place raising exceptions, disapearing
667
(and as such corrupting data) or if it's safe to activate the option by
670
Note that these tests relies on config._expand_default_value being already
671
overwritten in the parent class setUp.
675
super(TestIniConfigOptionExpansionDefaultValue, self).setUp()
679
self.warnings.append(args[0] % args[1:])
680
self.overrideAttr(trace, 'warning', warning)
682
def get_config(self, expand):
683
c = config.GlobalConfig.from_string('bzr.config.expand=%s' % (expand,),
687
def assertExpandIs(self, expected):
688
actual = config._get_expand_default_value()
689
#self.config.get_user_option_as_bool('bzr.config.expand')
690
self.assertEquals(expected, actual)
692
def test_default_is_None(self):
693
self.assertEquals(None, config._expand_default_value)
695
def test_default_is_False_even_if_None(self):
696
self.config = self.get_config(None)
697
self.assertExpandIs(False)
699
def test_default_is_False_even_if_invalid(self):
700
self.config = self.get_config('<your choice>')
701
self.assertExpandIs(False)
703
# Huh ? My choice is False ? Thanks, always happy to hear that :D
704
# Wait, you've been warned !
705
self.assertLength(1, self.warnings)
707
'Value "<your choice>" is not a boolean for "bzr.config.expand"',
710
def test_default_is_True(self):
711
self.config = self.get_config(True)
712
self.assertExpandIs(True)
714
def test_default_is_False(self):
715
self.config = self.get_config(False)
716
self.assertExpandIs(False)
719
class TestIniConfigOptionExpansion(tests.TestCase):
720
"""Test option expansion from the IniConfig level.
722
What we really want here is to test the Config level, but the class being
723
abstract as far as storing values is concerned, this can't be done
726
# FIXME: This should be rewritten when all configs share a storage
727
# implementation -- vila 2011-02-18
729
def get_config(self, string=None):
732
c = config.IniBasedConfig.from_string(string)
735
def assertExpansion(self, expected, conf, string, env=None):
736
self.assertEquals(expected, conf.expand_options(string, env))
738
def test_no_expansion(self):
739
c = self.get_config('')
740
self.assertExpansion('foo', c, 'foo')
742
def test_env_adding_options(self):
743
c = self.get_config('')
744
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
746
def test_env_overriding_options(self):
747
c = self.get_config('foo=baz')
748
self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
750
def test_simple_ref(self):
751
c = self.get_config('foo=xxx')
752
self.assertExpansion('xxx', c, '{foo}')
754
def test_unknown_ref(self):
755
c = self.get_config('')
756
self.assertRaises(errors.ExpandingUnknownOption,
757
c.expand_options, '{foo}')
759
def test_indirect_ref(self):
760
c = self.get_config('''
764
self.assertExpansion('xxx', c, '{bar}')
766
def test_embedded_ref(self):
767
c = self.get_config('''
771
self.assertExpansion('xxx', c, '{{bar}}')
773
def test_simple_loop(self):
774
c = self.get_config('foo={foo}')
775
self.assertRaises(errors.OptionExpansionLoop, c.expand_options, '{foo}')
777
def test_indirect_loop(self):
778
c = self.get_config('''
782
e = self.assertRaises(errors.OptionExpansionLoop,
783
c.expand_options, '{foo}')
784
self.assertEquals('foo->bar->baz', e.refs)
785
self.assertEquals('{foo}', e.string)
788
conf = self.get_config('''
792
list={foo},{bar},{baz}
794
self.assertEquals(['start', 'middle', 'end'],
795
conf.get_user_option('list', expand=True))
797
def test_cascading_list(self):
798
conf = self.get_config('''
804
self.assertEquals(['start', 'middle', 'end'],
805
conf.get_user_option('list', expand=True))
807
def test_pathological_hidden_list(self):
808
conf = self.get_config('''
814
hidden={start}{middle}{end}
816
# Nope, it's either a string or a list, and the list wins as soon as a
817
# ',' appears, so the string concatenation never occur.
818
self.assertEquals(['{foo', '}', '{', 'bar}'],
819
conf.get_user_option('hidden', expand=True))
821
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
823
def get_config(self, location, string=None):
826
# Since we don't save the config we won't strictly require to inherit
827
# from TestCaseInTempDir, but an error occurs so quickly...
828
c = config.LocationConfig.from_string(string, location)
831
def test_dont_cross_unrelated_section(self):
832
c = self.get_config('/another/branch/path','''
837
[/another/branch/path]
840
self.assertRaises(errors.ExpandingUnknownOption,
841
c.get_user_option, 'bar', expand=True)
843
def test_cross_related_sections(self):
844
c = self.get_config('/project/branch/path','''
848
[/project/branch/path]
851
self.assertEquals('quux', c.get_user_option('bar', expand=True))
854
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
856
def test_cannot_reload_without_name(self):
857
conf = config.IniBasedConfig.from_string(sample_config_text)
858
self.assertRaises(AssertionError, conf.reload)
860
def test_reload_see_new_value(self):
861
c1 = config.IniBasedConfig.from_string('editor=vim\n',
862
file_name='./test/conf')
863
c1._write_config_file()
864
c2 = config.IniBasedConfig.from_string('editor=emacs\n',
865
file_name='./test/conf')
866
c2._write_config_file()
867
self.assertEqual('vim', c1.get_user_option('editor'))
868
self.assertEqual('emacs', c2.get_user_option('editor'))
869
# Make sure we get the Right value
871
self.assertEqual('emacs', c1.get_user_option('editor'))
874
class TestLockableConfig(tests.TestCaseInTempDir):
876
scenarios = lockable_config_scenarios()
881
config_section = None
884
super(TestLockableConfig, self).setUp()
885
self._content = '[%s]\none=1\ntwo=2\n' % (self.config_section,)
886
self.config = self.create_config(self._content)
888
def get_existing_config(self):
889
return self.config_class(*self.config_args)
891
def create_config(self, content):
892
kwargs = dict(save=True)
893
c = self.config_class.from_string(content, *self.config_args, **kwargs)
896
def test_simple_read_access(self):
897
self.assertEquals('1', self.config.get_user_option('one'))
899
def test_simple_write_access(self):
900
self.config.set_user_option('one', 'one')
901
self.assertEquals('one', self.config.get_user_option('one'))
903
def test_listen_to_the_last_speaker(self):
905
c2 = self.get_existing_config()
906
c1.set_user_option('one', 'ONE')
907
c2.set_user_option('two', 'TWO')
908
self.assertEquals('ONE', c1.get_user_option('one'))
909
self.assertEquals('TWO', c2.get_user_option('two'))
910
# The second update respect the first one
911
self.assertEquals('ONE', c2.get_user_option('one'))
913
def test_last_speaker_wins(self):
914
# If the same config is not shared, the same variable modified twice
915
# can only see a single result.
917
c2 = self.get_existing_config()
918
c1.set_user_option('one', 'c1')
919
c2.set_user_option('one', 'c2')
920
self.assertEquals('c2', c2._get_user_option('one'))
921
# The first modification is still available until another refresh
923
self.assertEquals('c1', c1._get_user_option('one'))
924
c1.set_user_option('two', 'done')
925
self.assertEquals('c2', c1._get_user_option('one'))
927
def test_writes_are_serialized(self):
929
c2 = self.get_existing_config()
931
# We spawn a thread that will pause *during* the write
932
before_writing = threading.Event()
933
after_writing = threading.Event()
934
writing_done = threading.Event()
935
c1_orig = c1._write_config_file
936
def c1_write_config_file():
939
# The lock is held. We wait for the main thread to decide when to
942
c1._write_config_file = c1_write_config_file
944
c1.set_user_option('one', 'c1')
946
t1 = threading.Thread(target=c1_set_option)
947
# Collect the thread after the test
948
self.addCleanup(t1.join)
949
# Be ready to unblock the thread if the test goes wrong
950
self.addCleanup(after_writing.set)
952
before_writing.wait()
953
self.assertTrue(c1._lock.is_held)
954
self.assertRaises(errors.LockContention,
955
c2.set_user_option, 'one', 'c2')
956
self.assertEquals('c1', c1.get_user_option('one'))
957
# Let the lock be released
960
c2.set_user_option('one', 'c2')
961
self.assertEquals('c2', c2.get_user_option('one'))
963
def test_read_while_writing(self):
965
# We spawn a thread that will pause *during* the write
966
ready_to_write = threading.Event()
967
do_writing = threading.Event()
968
writing_done = threading.Event()
969
c1_orig = c1._write_config_file
970
def c1_write_config_file():
972
# The lock is held. We wait for the main thread to decide when to
977
c1._write_config_file = c1_write_config_file
979
c1.set_user_option('one', 'c1')
980
t1 = threading.Thread(target=c1_set_option)
981
# Collect the thread after the test
982
self.addCleanup(t1.join)
983
# Be ready to unblock the thread if the test goes wrong
984
self.addCleanup(do_writing.set)
986
# Ensure the thread is ready to write
987
ready_to_write.wait()
988
self.assertTrue(c1._lock.is_held)
989
self.assertEquals('c1', c1.get_user_option('one'))
990
# If we read during the write, we get the old value
991
c2 = self.get_existing_config()
992
self.assertEquals('1', c2.get_user_option('one'))
993
# Let the writing occur and ensure it occurred
996
# Now we get the updated value
997
c3 = self.get_existing_config()
998
self.assertEquals('c1', c3.get_user_option('one'))
1001
class TestGetUserOptionAs(TestIniConfig):
1003
def test_get_user_option_as_bool(self):
1004
conf, parser = self.make_config_parser("""
1007
an_invalid_bool = maybe
1008
a_list = hmm, who knows ? # This is interpreted as a list !
1010
get_bool = conf.get_user_option_as_bool
1011
self.assertEqual(True, get_bool('a_true_bool'))
1012
self.assertEqual(False, get_bool('a_false_bool'))
1015
warnings.append(args[0] % args[1:])
1016
self.overrideAttr(trace, 'warning', warning)
1017
msg = 'Value "%s" is not a boolean for "%s"'
1018
self.assertIs(None, get_bool('an_invalid_bool'))
1019
self.assertEquals(msg % ('maybe', 'an_invalid_bool'), warnings[0])
1021
self.assertIs(None, get_bool('not_defined_in_this_config'))
1022
self.assertEquals([], warnings)
1024
def test_get_user_option_as_list(self):
1025
conf, parser = self.make_config_parser("""
1030
get_list = conf.get_user_option_as_list
1031
self.assertEqual(['a', 'b', 'c'], get_list('a_list'))
1032
self.assertEqual(['1'], get_list('length_1'))
1033
self.assertEqual('x', conf.get_user_option('one_item'))
1034
# automatically cast to list
1035
self.assertEqual(['x'], get_list('one_item'))
1038
class TestSupressWarning(TestIniConfig):
1040
def make_warnings_config(self, s):
1041
conf, parser = self.make_config_parser(s)
1042
return conf.suppress_warning
1044
def test_suppress_warning_unknown(self):
1045
suppress_warning = self.make_warnings_config('')
1046
self.assertEqual(False, suppress_warning('unknown_warning'))
1048
def test_suppress_warning_known(self):
1049
suppress_warning = self.make_warnings_config('suppress_warnings=a,b')
1050
self.assertEqual(False, suppress_warning('c'))
1051
self.assertEqual(True, suppress_warning('a'))
1052
self.assertEqual(True, suppress_warning('b'))
1055
class TestGetConfig(tests.TestCase):
231
config_file = StringIO(sample_config_text)
232
my_config = config.IniBasedConfig(None)
233
parser = my_config._get_parser(file=config_file)
234
self.failUnless(my_config._get_parser() is parser)
237
class TestGetConfig(TestCase):
1057
239
def test_constructs(self):
1058
240
my_config = config.GlobalConfig()
1060
242
def test_calls_read_filenames(self):
1061
# replace the class that is constructed, to check its parameters
243
# replace the class that is constructured, to check its parameters
1062
244
oldparserclass = config.ConfigObj
1063
245
config.ConfigObj = InstrumentedConfigObj
1064
246
my_config = config.GlobalConfig()
1360
357
# This is testing the correct file names are provided.
1361
358
# TODO: consolidate with the test for GlobalConfigs filename checks.
1363
# replace the class that is constructed, to check its parameters
360
# replace the class that is constructured, to check its parameters
1364
361
oldparserclass = config.ConfigObj
1365
362
config.ConfigObj = InstrumentedConfigObj
363
my_config = config.LocationConfig('http://www.example.com')
1367
my_config = config.LocationConfig('http://www.example.com')
1368
365
parser = my_config._get_parser()
1370
367
config.ConfigObj = oldparserclass
1371
self.assertIsInstance(parser, InstrumentedConfigObj)
368
self.failUnless(isinstance(parser, InstrumentedConfigObj))
1372
369
self.assertEqual(parser._calls,
1373
[('__init__', config.locations_config_filename(),
370
[('__init__', config.branches_config_filename())])
1376
372
def test_get_global_config(self):
1377
my_config = config.BranchConfig(FakeBranch('http://example.com'))
373
my_config = config.LocationConfig('http://example.com')
1378
374
global_config = my_config._get_global_config()
1379
self.assertIsInstance(global_config, config.GlobalConfig)
1380
self.assertIs(global_config, my_config._get_global_config())
1382
def assertLocationMatching(self, expected):
1383
self.assertEqual(expected,
1384
list(self.my_location_config._get_matching_sections()))
1386
def test__get_matching_sections_no_match(self):
1387
self.get_branch_config('/')
1388
self.assertLocationMatching([])
1390
def test__get_matching_sections_exact(self):
1391
self.get_branch_config('http://www.example.com')
1392
self.assertLocationMatching([('http://www.example.com', '')])
1394
def test__get_matching_sections_suffix_does_not(self):
1395
self.get_branch_config('http://www.example.com-com')
1396
self.assertLocationMatching([])
1398
def test__get_matching_sections_subdir_recursive(self):
1399
self.get_branch_config('http://www.example.com/com')
1400
self.assertLocationMatching([('http://www.example.com', 'com')])
1402
def test__get_matching_sections_ignoreparent(self):
1403
self.get_branch_config('http://www.example.com/ignoreparent')
1404
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1407
def test__get_matching_sections_ignoreparent_subdir(self):
1408
self.get_branch_config(
1409
'http://www.example.com/ignoreparent/childbranch')
1410
self.assertLocationMatching([('http://www.example.com/ignoreparent',
1413
def test__get_matching_sections_subdir_trailing_slash(self):
1414
self.get_branch_config('/b')
1415
self.assertLocationMatching([('/b/', '')])
1417
def test__get_matching_sections_subdir_child(self):
1418
self.get_branch_config('/a/foo')
1419
self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
1421
def test__get_matching_sections_subdir_child_child(self):
1422
self.get_branch_config('/a/foo/bar')
1423
self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
1425
def test__get_matching_sections_trailing_slash_with_children(self):
1426
self.get_branch_config('/a/')
1427
self.assertLocationMatching([('/a/', '')])
1429
def test__get_matching_sections_explicit_over_glob(self):
1430
# XXX: 2006-09-08 jamesh
1431
# This test only passes because ord('c') > ord('*'). If there
1432
# was a config section for '/a/?', it would get precedence
1434
self.get_branch_config('/a/c')
1435
self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
1437
def test__get_option_policy_normal(self):
1438
self.get_branch_config('http://www.example.com')
1440
self.my_location_config._get_config_policy(
1441
'http://www.example.com', 'normal_option'),
1444
def test__get_option_policy_norecurse(self):
1445
self.get_branch_config('http://www.example.com')
1447
self.my_location_config._get_option_policy(
1448
'http://www.example.com', 'norecurse_option'),
1449
config.POLICY_NORECURSE)
1450
# Test old recurse=False setting:
1452
self.my_location_config._get_option_policy(
1453
'http://www.example.com/norecurse', 'normal_option'),
1454
config.POLICY_NORECURSE)
1456
def test__get_option_policy_normal(self):
1457
self.get_branch_config('http://www.example.com')
1459
self.my_location_config._get_option_policy(
1460
'http://www.example.com', 'appendpath_option'),
1461
config.POLICY_APPENDPATH)
1463
def test__get_options_with_policy(self):
1464
self.get_branch_config('/dir/subdir',
1465
location_config="""\
1467
other_url = /other-dir
1468
other_url:policy = appendpath
1470
other_url = /other-subdir
1473
[(u'other_url', u'/other-subdir', u'/dir/subdir', 'locations'),
1474
(u'other_url', u'/other-dir', u'/dir', 'locations'),
1475
(u'other_url:policy', u'appendpath', u'/dir', 'locations')],
1476
self.my_location_config)
375
self.failUnless(isinstance(global_config, config.GlobalConfig))
376
self.failUnless(global_config is my_config._get_global_config())
378
def test__get_section_no_match(self):
379
self.get_location_config('/')
380
self.assertEqual(None, self.my_config._get_section())
382
def test__get_section_exact(self):
383
self.get_location_config('http://www.example.com')
384
self.assertEqual('http://www.example.com',
385
self.my_config._get_section())
387
def test__get_section_suffix_does_not(self):
388
self.get_location_config('http://www.example.com-com')
389
self.assertEqual(None, self.my_config._get_section())
391
def test__get_section_subdir_recursive(self):
392
self.get_location_config('http://www.example.com/com')
393
self.assertEqual('http://www.example.com',
394
self.my_config._get_section())
396
def test__get_section_subdir_matches(self):
397
self.get_location_config('http://www.example.com/useglobal')
398
self.assertEqual('http://www.example.com/useglobal',
399
self.my_config._get_section())
401
def test__get_section_subdir_nonrecursive(self):
402
self.get_location_config(
403
'http://www.example.com/useglobal/childbranch')
404
self.assertEqual('http://www.example.com',
405
self.my_config._get_section())
407
def test__get_section_subdir_trailing_slash(self):
408
self.get_location_config('/b')
409
self.assertEqual('/b/', self.my_config._get_section())
411
def test__get_section_subdir_child(self):
412
self.get_location_config('/a/foo')
413
self.assertEqual('/a/*', self.my_config._get_section())
415
def test__get_section_subdir_child_child(self):
416
self.get_location_config('/a/foo/bar')
417
self.assertEqual('/a/', self.my_config._get_section())
419
def test__get_section_trailing_slash_with_children(self):
420
self.get_location_config('/a/')
421
self.assertEqual('/a/', self.my_config._get_section())
423
def test__get_section_explicit_over_glob(self):
424
self.get_location_config('/a/c')
425
self.assertEqual('/a/c', self.my_config._get_section())
427
def get_location_config(self, location, global_config=None):
428
if global_config is None:
429
global_file = StringIO(sample_config_text)
431
global_file = StringIO(global_config)
432
branches_file = StringIO(sample_branches_text)
433
self.my_config = config.LocationConfig(location)
434
self.my_config._get_parser(branches_file)
435
self.my_config._get_global_config()._get_parser(global_file)
1478
437
def test_location_without_username(self):
1479
self.get_branch_config('http://www.example.com/ignoreparent')
1480
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
438
self.get_location_config('http://www.example.com/useglobal')
439
self.assertEqual('Robert Collins <robertc@example.com>',
1481
440
self.my_config.username())
1483
442
def test_location_not_listed(self):
1484
"""Test that the global username is used when no location matches"""
1485
self.get_branch_config('/home/robertc/sources')
1486
self.assertEqual(u'Erik B\u00e5gfors <erik@bagfors.nu>',
443
self.get_location_config('/home/robertc/sources')
444
self.assertEqual('Robert Collins <robertc@example.com>',
1487
445
self.my_config.username())
1489
447
def test_overriding_location(self):
1490
self.get_branch_config('http://www.example.com/foo')
448
self.get_location_config('http://www.example.com/foo')
1491
449
self.assertEqual('Robert Collins <robertc@example.org>',
1492
450
self.my_config.username())
1494
452
def test_signatures_not_set(self):
1495
self.get_branch_config('http://www.example.com',
453
self.get_location_config('http://www.example.com',
1496
454
global_config=sample_ignore_signatures)
1497
self.assertEqual(config.CHECK_ALWAYS,
455
self.assertEqual(config.CHECK_NEVER,
1498
456
self.my_config.signature_checking())
1499
self.assertEqual(config.SIGN_NEVER,
1500
self.my_config.signing_policy())
1502
458
def test_signatures_never(self):
1503
self.get_branch_config('/a/c')
459
self.get_location_config('/a/c')
1504
460
self.assertEqual(config.CHECK_NEVER,
1505
461
self.my_config.signature_checking())
1507
463
def test_signatures_when_available(self):
1508
self.get_branch_config('/a/', global_config=sample_ignore_signatures)
464
self.get_location_config('/a/', global_config=sample_ignore_signatures)
1509
465
self.assertEqual(config.CHECK_IF_POSSIBLE,
1510
466
self.my_config.signature_checking())
1512
468
def test_signatures_always(self):
1513
self.get_branch_config('/b')
469
self.get_location_config('/b')
1514
470
self.assertEqual(config.CHECK_ALWAYS,
1515
471
self.my_config.signature_checking())
1517
473
def test_gpg_signing_command(self):
1518
self.get_branch_config('/b')
474
self.get_location_config('/b')
1519
475
self.assertEqual("gnome-gpg", self.my_config.gpg_signing_command())
1521
477
def test_gpg_signing_command_missing(self):
1522
self.get_branch_config('/a')
478
self.get_location_config('/a')
1523
479
self.assertEqual("false", self.my_config.gpg_signing_command())
1525
481
def test_get_user_option_global(self):
1526
self.get_branch_config('/a')
482
self.get_location_config('/a')
1527
483
self.assertEqual('something',
1528
484
self.my_config.get_user_option('user_global_option'))
1530
486
def test_get_user_option_local(self):
1531
self.get_branch_config('/a')
487
self.get_location_config('/a')
1532
488
self.assertEqual('local',
1533
489
self.my_config.get_user_option('user_local_option'))
1535
def test_get_user_option_appendpath(self):
1536
# returned as is for the base path:
1537
self.get_branch_config('http://www.example.com')
1538
self.assertEqual('append',
1539
self.my_config.get_user_option('appendpath_option'))
1540
# Extra path components get appended:
1541
self.get_branch_config('http://www.example.com/a/b/c')
1542
self.assertEqual('append/a/b/c',
1543
self.my_config.get_user_option('appendpath_option'))
1544
# Overriden for http://www.example.com/dir, where it is a
1546
self.get_branch_config('http://www.example.com/dir/a/b/c')
1547
self.assertEqual('normal',
1548
self.my_config.get_user_option('appendpath_option'))
1550
def test_get_user_option_norecurse(self):
1551
self.get_branch_config('http://www.example.com')
1552
self.assertEqual('norecurse',
1553
self.my_config.get_user_option('norecurse_option'))
1554
self.get_branch_config('http://www.example.com/dir')
1555
self.assertEqual(None,
1556
self.my_config.get_user_option('norecurse_option'))
1557
# http://www.example.com/norecurse is a recurse=False section
1558
# that redefines normal_option. Subdirectories do not pick up
1559
# this redefinition.
1560
self.get_branch_config('http://www.example.com/norecurse')
1561
self.assertEqual('norecurse',
1562
self.my_config.get_user_option('normal_option'))
1563
self.get_branch_config('http://www.example.com/norecurse/subdir')
1564
self.assertEqual('normal',
1565
self.my_config.get_user_option('normal_option'))
1567
def test_set_user_option_norecurse(self):
1568
self.get_branch_config('http://www.example.com')
1569
self.my_config.set_user_option('foo', 'bar',
1570
store=config.STORE_LOCATION_NORECURSE)
1572
self.my_location_config._get_option_policy(
1573
'http://www.example.com', 'foo'),
1574
config.POLICY_NORECURSE)
1576
def test_set_user_option_appendpath(self):
1577
self.get_branch_config('http://www.example.com')
1578
self.my_config.set_user_option('foo', 'bar',
1579
store=config.STORE_LOCATION_APPENDPATH)
1581
self.my_location_config._get_option_policy(
1582
'http://www.example.com', 'foo'),
1583
config.POLICY_APPENDPATH)
1585
def test_set_user_option_change_policy(self):
1586
self.get_branch_config('http://www.example.com')
1587
self.my_config.set_user_option('norecurse_option', 'normal',
1588
store=config.STORE_LOCATION)
1590
self.my_location_config._get_option_policy(
1591
'http://www.example.com', 'norecurse_option'),
1594
def test_set_user_option_recurse_false_section(self):
1595
# The following section has recurse=False set. The test is to
1596
# make sure that a normal option can be added to the section,
1597
# converting recurse=False to the norecurse policy.
1598
self.get_branch_config('http://www.example.com/norecurse')
1599
self.callDeprecated(['The recurse option is deprecated as of 0.14. '
1600
'The section "http://www.example.com/norecurse" '
1601
'has been converted to use policies.'],
1602
self.my_config.set_user_option,
1603
'foo', 'bar', store=config.STORE_LOCATION)
1605
self.my_location_config._get_option_policy(
1606
'http://www.example.com/norecurse', 'foo'),
1608
# The previously existing option is still norecurse:
1610
self.my_location_config._get_option_policy(
1611
'http://www.example.com/norecurse', 'normal_option'),
1612
config.POLICY_NORECURSE)
1614
491
def test_post_commit_default(self):
1615
self.get_branch_config('/a/c')
492
self.get_location_config('/a/c')
1616
493
self.assertEqual('bzrlib.tests.test_config.post_commit',
1617
494
self.my_config.post_commit())
1619
def get_branch_config(self, location, global_config=None,
1620
location_config=None):
1621
my_branch = FakeBranch(location)
497
class TestLocationConfig(TestCaseInTempDir):
499
def get_location_config(self, location, global_config=None):
1622
500
if global_config is None:
1623
global_config = sample_config_text
1624
if location_config is None:
1625
location_config = sample_branches_text
1627
my_global_config = config.GlobalConfig.from_string(global_config,
1629
my_location_config = config.LocationConfig.from_string(
1630
location_config, my_branch.base, save=True)
1631
my_config = config.BranchConfig(my_branch)
1632
self.my_config = my_config
1633
self.my_location_config = my_config._get_location_config()
501
global_file = StringIO(sample_config_text)
503
global_file = StringIO(global_config)
504
branches_file = StringIO(sample_branches_text)
505
self.my_config = config.LocationConfig(location)
506
self.my_config._get_parser(branches_file)
507
self.my_config._get_global_config()._get_parser(global_file)
1635
509
def test_set_user_setting_sets_and_saves(self):
1636
self.get_branch_config('/a/c')
510
# TODO RBC 20051029 test hat mkdir ~/.bazaar is called ..
511
self.get_location_config('/a/c')
1637
512
record = InstrumentedConfigObj("foo")
1638
self.my_location_config._parser = record
1640
self.callDeprecated(['The recurse option is deprecated as of '
1641
'0.14. The section "/a/c" has been '
1642
'converted to use policies.'],
1643
self.my_config.set_user_option,
1644
'foo', 'bar', store=config.STORE_LOCATION)
1645
self.assertEqual([('reload',),
1646
('__contains__', '/a/c'),
513
self.my_config._parser = record
514
print ("test_set_user_setting_sets_and_saves broken: creates .bazaar "
515
"in the top-level directory, not inside the test directory")
517
self.my_config.set_user_option('foo', 'bar')
518
self.assertEqual([('__contains__', '/a/c'),
1647
519
('__contains__', '/a/c/'),
1648
520
('__setitem__', '/a/c', {}),
1649
521
('__getitem__', '/a/c'),
1650
522
('__setitem__', 'foo', 'bar'),
1651
('__getitem__', '/a/c'),
1652
('as_bool', 'recurse'),
1653
('__getitem__', '/a/c'),
1654
('__delitem__', 'recurse'),
1655
('__getitem__', '/a/c'),
1657
('__getitem__', '/a/c'),
1658
('__contains__', 'foo:policy'),
1660
524
record._calls[1:])
1662
def test_set_user_setting_sets_and_saves2(self):
1663
self.get_branch_config('/a/c')
1664
self.assertIs(self.my_config.get_user_option('foo'), None)
1665
self.my_config.set_user_option('foo', 'bar')
1667
self.my_config.branch.control_files.files['branch.conf'].strip(),
1669
self.assertEqual(self.my_config.get_user_option('foo'), 'bar')
1670
self.my_config.set_user_option('foo', 'baz',
1671
store=config.STORE_LOCATION)
1672
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1673
self.my_config.set_user_option('foo', 'qux')
1674
self.assertEqual(self.my_config.get_user_option('foo'), 'baz')
1676
def test_get_bzr_remote_path(self):
1677
my_config = config.LocationConfig('/a/c')
1678
self.assertEqual('bzr', my_config.get_bzr_remote_path())
1679
my_config.set_user_option('bzr_remote_path', '/path-bzr')
1680
self.assertEqual('/path-bzr', my_config.get_bzr_remote_path())
1681
self.overrideEnv('BZR_REMOTE_PATH', '/environ-bzr')
1682
self.assertEqual('/environ-bzr', my_config.get_bzr_remote_path())
1685
precedence_global = 'option = global'
1686
precedence_branch = 'option = branch'
1687
precedence_location = """
1691
[http://example.com/specific]
1695
class TestBranchConfigItems(tests.TestCaseInTempDir):
1697
def get_branch_config(self, global_config=None, location=None,
1698
location_config=None, branch_data_config=None):
1699
my_branch = FakeBranch(location)
1700
if global_config is not None:
1701
my_global_config = config.GlobalConfig.from_string(global_config,
1703
if location_config is not None:
1704
my_location_config = config.LocationConfig.from_string(
1705
location_config, my_branch.base, save=True)
1706
my_config = config.BranchConfig(my_branch)
1707
if branch_data_config is not None:
1708
my_config.branch.control_files.files['branch.conf'] = \
527
class TestBranchConfigItems(TestCase):
1712
529
def test_user_id(self):
1713
branch = FakeBranch(user_id='Robert Collins <robertc@example.net>')
530
branch = FakeBranch()
1714
531
my_config = config.BranchConfig(branch)
1715
532
self.assertEqual("Robert Collins <robertc@example.net>",
1716
my_config.username())
1717
my_config.branch.control_files.files['email'] = "John"
1718
my_config.set_user_option('email',
1719
"Robert Collins <robertc@example.org>")
1720
self.assertEqual("John", my_config.username())
1721
del my_config.branch.control_files.files['email']
1722
self.assertEqual("Robert Collins <robertc@example.org>",
1723
my_config.username())
533
my_config._get_user_id())
534
branch.email = "John"
535
self.assertEqual("John", my_config._get_user_id())
1725
537
def test_not_set_in_branch(self):
1726
my_config = self.get_branch_config(global_config=sample_config_text)
1727
self.assertEqual(u"Erik B\u00e5gfors <erik@bagfors.nu>",
538
branch = FakeBranch()
539
my_config = config.BranchConfig(branch)
541
config_file = StringIO(sample_config_text)
542
(my_config._get_location_config().
543
_get_global_config()._get_parser(config_file))
544
self.assertEqual("Robert Collins <robertc@example.com>",
1728
545
my_config._get_user_id())
1729
my_config.branch.control_files.files['email'] = "John"
546
branch.email = "John"
1730
547
self.assertEqual("John", my_config._get_user_id())
1732
def test_BZR_EMAIL_OVERRIDES(self):
1733
self.overrideEnv('BZR_EMAIL', "Robert Collins <robertc@example.org>")
549
def test_BZREMAIL_OVERRIDES(self):
550
os.environ['BZREMAIL'] = "Robert Collins <robertc@example.org>"
1734
551
branch = FakeBranch()
1735
552
my_config = config.BranchConfig(branch)
1736
553
self.assertEqual("Robert Collins <robertc@example.org>",
1737
554
my_config.username())
1739
556
def test_signatures_forced(self):
1740
my_config = self.get_branch_config(
1741
global_config=sample_always_signatures)
1742
self.assertEqual(config.CHECK_NEVER, my_config.signature_checking())
1743
self.assertEqual(config.SIGN_ALWAYS, my_config.signing_policy())
1744
self.assertTrue(my_config.signature_needed())
1746
def test_signatures_forced_branch(self):
1747
my_config = self.get_branch_config(
1748
global_config=sample_ignore_signatures,
1749
branch_data_config=sample_always_signatures)
1750
self.assertEqual(config.CHECK_NEVER, my_config.signature_checking())
1751
self.assertEqual(config.SIGN_ALWAYS, my_config.signing_policy())
1752
self.assertTrue(my_config.signature_needed())
557
branch = FakeBranch()
558
my_config = config.BranchConfig(branch)
559
config_file = StringIO(sample_always_signatures)
560
(my_config._get_location_config().
561
_get_global_config()._get_parser(config_file))
562
self.assertEqual(config.CHECK_ALWAYS, my_config.signature_checking())
1754
564
def test_gpg_signing_command(self):
1755
my_config = self.get_branch_config(
1756
global_config=sample_config_text,
1757
# branch data cannot set gpg_signing_command
1758
branch_data_config="gpg_signing_command=pgp")
565
branch = FakeBranch()
566
my_config = config.BranchConfig(branch)
567
config_file = StringIO(sample_config_text)
568
(my_config._get_location_config().
569
_get_global_config()._get_parser(config_file))
1759
570
self.assertEqual('gnome-gpg', my_config.gpg_signing_command())
1761
572
def test_get_user_option_global(self):
1762
my_config = self.get_branch_config(global_config=sample_config_text)
573
branch = FakeBranch()
574
my_config = config.BranchConfig(branch)
575
config_file = StringIO(sample_config_text)
576
(my_config._get_location_config().
577
_get_global_config()._get_parser(config_file))
1763
578
self.assertEqual('something',
1764
579
my_config.get_user_option('user_global_option'))
1766
581
def test_post_commit_default(self):
1767
my_config = self.get_branch_config(global_config=sample_config_text,
1769
location_config=sample_branches_text)
1770
self.assertEqual(my_config.branch.base, '/a/c')
1771
self.assertEqual('bzrlib.tests.test_config.post_commit',
1772
my_config.post_commit())
1773
my_config.set_user_option('post_commit', 'rmtree_root')
1774
# post-commit is ignored when present in branch data
1775
self.assertEqual('bzrlib.tests.test_config.post_commit',
1776
my_config.post_commit())
1777
my_config.set_user_option('post_commit', 'rmtree_root',
1778
store=config.STORE_LOCATION)
1779
self.assertEqual('rmtree_root', my_config.post_commit())
1781
def test_config_precedence(self):
1782
# FIXME: eager test, luckily no persitent config file makes it fail
1784
my_config = self.get_branch_config(global_config=precedence_global)
1785
self.assertEqual(my_config.get_user_option('option'), 'global')
1786
my_config = self.get_branch_config(global_config=precedence_global,
1787
branch_data_config=precedence_branch)
1788
self.assertEqual(my_config.get_user_option('option'), 'branch')
1789
my_config = self.get_branch_config(
1790
global_config=precedence_global,
1791
branch_data_config=precedence_branch,
1792
location_config=precedence_location)
1793
self.assertEqual(my_config.get_user_option('option'), 'recurse')
1794
my_config = self.get_branch_config(
1795
global_config=precedence_global,
1796
branch_data_config=precedence_branch,
1797
location_config=precedence_location,
1798
location='http://example.com/specific')
1799
self.assertEqual(my_config.get_user_option('option'), 'exact')
1801
def test_get_mail_client(self):
1802
config = self.get_branch_config()
1803
client = config.get_mail_client()
1804
self.assertIsInstance(client, mail_client.DefaultMail)
1807
config.set_user_option('mail_client', 'evolution')
1808
client = config.get_mail_client()
1809
self.assertIsInstance(client, mail_client.Evolution)
1811
config.set_user_option('mail_client', 'kmail')
1812
client = config.get_mail_client()
1813
self.assertIsInstance(client, mail_client.KMail)
1815
config.set_user_option('mail_client', 'mutt')
1816
client = config.get_mail_client()
1817
self.assertIsInstance(client, mail_client.Mutt)
1819
config.set_user_option('mail_client', 'thunderbird')
1820
client = config.get_mail_client()
1821
self.assertIsInstance(client, mail_client.Thunderbird)
1824
config.set_user_option('mail_client', 'default')
1825
client = config.get_mail_client()
1826
self.assertIsInstance(client, mail_client.DefaultMail)
1828
config.set_user_option('mail_client', 'editor')
1829
client = config.get_mail_client()
1830
self.assertIsInstance(client, mail_client.Editor)
1832
config.set_user_option('mail_client', 'mapi')
1833
client = config.get_mail_client()
1834
self.assertIsInstance(client, mail_client.MAPIClient)
1836
config.set_user_option('mail_client', 'xdg-email')
1837
client = config.get_mail_client()
1838
self.assertIsInstance(client, mail_client.XDGEmail)
1840
config.set_user_option('mail_client', 'firebird')
1841
self.assertRaises(errors.UnknownMailClient, config.get_mail_client)
1844
class TestMailAddressExtraction(tests.TestCase):
582
branch = FakeBranch()
584
my_config = config.BranchConfig(branch)
585
config_file = StringIO(sample_config_text)
586
(my_config._get_location_config().
587
_get_global_config()._get_parser(config_file))
588
branch_file = StringIO(sample_branches_text)
589
my_config._get_location_config()._get_parser(branch_file)
590
self.assertEqual('bzrlib.tests.test_config.post_commit',
591
my_config.post_commit())
594
class TestMailAddressExtraction(TestCase):
1846
596
def test_extract_email_address(self):
1847
597
self.assertEqual('jane@test.com',
1848
598
config.extract_email_address('Jane <jane@test.com>'))
1849
self.assertRaises(errors.NoEmailInUsername,
599
self.assertRaises(errors.BzrError,
1850
600
config.extract_email_address, 'Jane Tester')
1852
def test_parse_username(self):
1853
self.assertEqual(('', 'jdoe@example.com'),
1854
config.parse_username('jdoe@example.com'))
1855
self.assertEqual(('', 'jdoe@example.com'),
1856
config.parse_username('<jdoe@example.com>'))
1857
self.assertEqual(('John Doe', 'jdoe@example.com'),
1858
config.parse_username('John Doe <jdoe@example.com>'))
1859
self.assertEqual(('John Doe', ''),
1860
config.parse_username('John Doe'))
1861
self.assertEqual(('John Doe', 'jdoe@example.com'),
1862
config.parse_username('John Doe jdoe@example.com'))
1864
class TestTreeConfig(tests.TestCaseWithTransport):
1866
def test_get_value(self):
1867
"""Test that retreiving a value from a section is possible"""
1868
branch = self.make_branch('.')
1869
tree_config = config.TreeConfig(branch)
1870
tree_config.set_option('value', 'key', 'SECTION')
1871
tree_config.set_option('value2', 'key2')
1872
tree_config.set_option('value3-top', 'key3')
1873
tree_config.set_option('value3-section', 'key3', 'SECTION')
1874
value = tree_config.get_option('key', 'SECTION')
1875
self.assertEqual(value, 'value')
1876
value = tree_config.get_option('key2')
1877
self.assertEqual(value, 'value2')
1878
self.assertEqual(tree_config.get_option('non-existant'), None)
1879
value = tree_config.get_option('non-existant', 'SECTION')
1880
self.assertEqual(value, None)
1881
value = tree_config.get_option('non-existant', default='default')
1882
self.assertEqual(value, 'default')
1883
self.assertEqual(tree_config.get_option('key2', 'NOSECTION'), None)
1884
value = tree_config.get_option('key2', 'NOSECTION', default='default')
1885
self.assertEqual(value, 'default')
1886
value = tree_config.get_option('key3')
1887
self.assertEqual(value, 'value3-top')
1888
value = tree_config.get_option('key3', 'SECTION')
1889
self.assertEqual(value, 'value3-section')
1892
class TestTransportConfig(tests.TestCaseWithTransport):
1894
def test_load_utf8(self):
1895
"""Ensure we can load an utf8-encoded file."""
1896
t = self.get_transport()
1897
unicode_user = u'b\N{Euro Sign}ar'
1898
unicode_content = u'user=%s' % (unicode_user,)
1899
utf8_content = unicode_content.encode('utf8')
1900
# Store the raw content in the config file
1901
t.put_bytes('foo.conf', utf8_content)
1902
conf = config.TransportConfig(t, 'foo.conf')
1903
self.assertEquals(unicode_user, conf.get_option('user'))
1905
def test_load_non_ascii(self):
1906
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
1907
t = self.get_transport()
1908
t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
1909
conf = config.TransportConfig(t, 'foo.conf')
1910
self.assertRaises(errors.ConfigContentError, conf._get_configobj)
1912
def test_load_erroneous_content(self):
1913
"""Ensure we display a proper error on content that can't be parsed."""
1914
t = self.get_transport()
1915
t.put_bytes('foo.conf', '[open_section\n')
1916
conf = config.TransportConfig(t, 'foo.conf')
1917
self.assertRaises(errors.ParseConfigError, conf._get_configobj)
1919
def test_get_value(self):
1920
"""Test that retreiving a value from a section is possible"""
1921
bzrdir_config = config.TransportConfig(self.get_transport('.'),
1923
bzrdir_config.set_option('value', 'key', 'SECTION')
1924
bzrdir_config.set_option('value2', 'key2')
1925
bzrdir_config.set_option('value3-top', 'key3')
1926
bzrdir_config.set_option('value3-section', 'key3', 'SECTION')
1927
value = bzrdir_config.get_option('key', 'SECTION')
1928
self.assertEqual(value, 'value')
1929
value = bzrdir_config.get_option('key2')
1930
self.assertEqual(value, 'value2')
1931
self.assertEqual(bzrdir_config.get_option('non-existant'), None)
1932
value = bzrdir_config.get_option('non-existant', 'SECTION')
1933
self.assertEqual(value, None)
1934
value = bzrdir_config.get_option('non-existant', default='default')
1935
self.assertEqual(value, 'default')
1936
self.assertEqual(bzrdir_config.get_option('key2', 'NOSECTION'), None)
1937
value = bzrdir_config.get_option('key2', 'NOSECTION',
1939
self.assertEqual(value, 'default')
1940
value = bzrdir_config.get_option('key3')
1941
self.assertEqual(value, 'value3-top')
1942
value = bzrdir_config.get_option('key3', 'SECTION')
1943
self.assertEqual(value, 'value3-section')
1945
def test_set_unset_default_stack_on(self):
1946
my_dir = self.make_bzrdir('.')
1947
bzrdir_config = config.BzrDirConfig(my_dir)
1948
self.assertIs(None, bzrdir_config.get_default_stack_on())
1949
bzrdir_config.set_default_stack_on('Foo')
1950
self.assertEqual('Foo', bzrdir_config._config.get_option(
1951
'default_stack_on'))
1952
self.assertEqual('Foo', bzrdir_config.get_default_stack_on())
1953
bzrdir_config.set_default_stack_on(None)
1954
self.assertIs(None, bzrdir_config.get_default_stack_on())
1957
class TestOldConfigHooks(tests.TestCaseWithTransport):
1960
super(TestOldConfigHooks, self).setUp()
1961
create_configs_with_file_option(self)
1963
def assertGetHook(self, conf, name, value):
1967
config.OldConfigHooks.install_named_hook('get', hook, None)
1969
config.OldConfigHooks.uninstall_named_hook, 'get', None)
1970
self.assertLength(0, calls)
1971
actual_value = conf.get_user_option(name)
1972
self.assertEquals(value, actual_value)
1973
self.assertLength(1, calls)
1974
self.assertEquals((conf, name, value), calls[0])
1976
def test_get_hook_bazaar(self):
1977
self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
1979
def test_get_hook_locations(self):
1980
self.assertGetHook(self.locations_config, 'file', 'locations')
1982
def test_get_hook_branch(self):
1983
# Since locations masks branch, we define a different option
1984
self.branch_config.set_user_option('file2', 'branch')
1985
self.assertGetHook(self.branch_config, 'file2', 'branch')
1987
def assertSetHook(self, conf, name, value):
1991
config.OldConfigHooks.install_named_hook('set', hook, None)
1993
config.OldConfigHooks.uninstall_named_hook, 'set', None)
1994
self.assertLength(0, calls)
1995
conf.set_user_option(name, value)
1996
self.assertLength(1, calls)
1997
# We can't assert the conf object below as different configs use
1998
# different means to implement set_user_option and we care only about
2000
self.assertEquals((name, value), calls[0][1:])
2002
def test_set_hook_bazaar(self):
2003
self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
2005
def test_set_hook_locations(self):
2006
self.assertSetHook(self.locations_config, 'foo', 'locations')
2008
def test_set_hook_branch(self):
2009
self.assertSetHook(self.branch_config, 'foo', 'branch')
2011
def assertRemoveHook(self, conf, name, section_name=None):
2015
config.OldConfigHooks.install_named_hook('remove', hook, None)
2017
config.OldConfigHooks.uninstall_named_hook, 'remove', None)
2018
self.assertLength(0, calls)
2019
conf.remove_user_option(name, section_name)
2020
self.assertLength(1, calls)
2021
# We can't assert the conf object below as different configs use
2022
# different means to implement remove_user_option and we care only about
2024
self.assertEquals((name,), calls[0][1:])
2026
def test_remove_hook_bazaar(self):
2027
self.assertRemoveHook(self.bazaar_config, 'file')
2029
def test_remove_hook_locations(self):
2030
self.assertRemoveHook(self.locations_config, 'file',
2031
self.locations_config.location)
2033
def test_remove_hook_branch(self):
2034
self.assertRemoveHook(self.branch_config, 'file')
2036
def assertLoadHook(self, name, conf_class, *conf_args):
2040
config.OldConfigHooks.install_named_hook('load', hook, None)
2042
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2043
self.assertLength(0, calls)
2045
conf = conf_class(*conf_args)
2046
# Access an option to trigger a load
2047
conf.get_user_option(name)
2048
self.assertLength(1, calls)
2049
# Since we can't assert about conf, we just use the number of calls ;-/
2051
def test_load_hook_bazaar(self):
2052
self.assertLoadHook('file', config.GlobalConfig)
2054
def test_load_hook_locations(self):
2055
self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
2057
def test_load_hook_branch(self):
2058
self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
2060
def assertSaveHook(self, conf):
2064
config.OldConfigHooks.install_named_hook('save', hook, None)
2066
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2067
self.assertLength(0, calls)
2068
# Setting an option triggers a save
2069
conf.set_user_option('foo', 'bar')
2070
self.assertLength(1, calls)
2071
# Since we can't assert about conf, we just use the number of calls ;-/
2073
def test_save_hook_bazaar(self):
2074
self.assertSaveHook(self.bazaar_config)
2076
def test_save_hook_locations(self):
2077
self.assertSaveHook(self.locations_config)
2079
def test_save_hook_branch(self):
2080
self.assertSaveHook(self.branch_config)
2083
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
2084
"""Tests config hooks for remote configs.
2086
No tests for the remove hook as this is not implemented there.
2090
super(TestOldConfigHooksForRemote, self).setUp()
2091
self.transport_server = test_server.SmartTCPServer_for_testing
2092
create_configs_with_file_option(self)
2094
def assertGetHook(self, conf, name, value):
2098
config.OldConfigHooks.install_named_hook('get', hook, None)
2100
config.OldConfigHooks.uninstall_named_hook, 'get', None)
2101
self.assertLength(0, calls)
2102
actual_value = conf.get_option(name)
2103
self.assertEquals(value, actual_value)
2104
self.assertLength(1, calls)
2105
self.assertEquals((conf, name, value), calls[0])
2107
def test_get_hook_remote_branch(self):
2108
remote_branch = branch.Branch.open(self.get_url('tree'))
2109
self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
2111
def test_get_hook_remote_bzrdir(self):
2112
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2113
conf = remote_bzrdir._get_config()
2114
conf.set_option('remotedir', 'file')
2115
self.assertGetHook(conf, 'file', 'remotedir')
2117
def assertSetHook(self, conf, name, value):
2121
config.OldConfigHooks.install_named_hook('set', hook, None)
2123
config.OldConfigHooks.uninstall_named_hook, 'set', None)
2124
self.assertLength(0, calls)
2125
conf.set_option(value, name)
2126
self.assertLength(1, calls)
2127
# We can't assert the conf object below as different configs use
2128
# different means to implement set_user_option and we care only about
2130
self.assertEquals((name, value), calls[0][1:])
2132
def test_set_hook_remote_branch(self):
2133
remote_branch = branch.Branch.open(self.get_url('tree'))
2134
self.addCleanup(remote_branch.lock_write().unlock)
2135
self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
2137
def test_set_hook_remote_bzrdir(self):
2138
remote_branch = branch.Branch.open(self.get_url('tree'))
2139
self.addCleanup(remote_branch.lock_write().unlock)
2140
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2141
self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
2143
def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
2147
config.OldConfigHooks.install_named_hook('load', hook, None)
2149
config.OldConfigHooks.uninstall_named_hook, 'load', None)
2150
self.assertLength(0, calls)
2152
conf = conf_class(*conf_args)
2153
# Access an option to trigger a load
2154
conf.get_option(name)
2155
self.assertLength(expected_nb_calls, calls)
2156
# Since we can't assert about conf, we just use the number of calls ;-/
2158
def test_load_hook_remote_branch(self):
2159
remote_branch = branch.Branch.open(self.get_url('tree'))
2160
self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
2162
def test_load_hook_remote_bzrdir(self):
2163
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2164
# The config file doesn't exist, set an option to force its creation
2165
conf = remote_bzrdir._get_config()
2166
conf.set_option('remotedir', 'file')
2167
# We get one call for the server and one call for the client, this is
2168
# caused by the differences in implementations betwen
2169
# SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
2170
# SmartServerBranchGetConfigFile (in smart/branch.py)
2171
self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
2173
def assertSaveHook(self, conf):
2177
config.OldConfigHooks.install_named_hook('save', hook, None)
2179
config.OldConfigHooks.uninstall_named_hook, 'save', None)
2180
self.assertLength(0, calls)
2181
# Setting an option triggers a save
2182
conf.set_option('foo', 'bar')
2183
self.assertLength(1, calls)
2184
# Since we can't assert about conf, we just use the number of calls ;-/
2186
def test_save_hook_remote_branch(self):
2187
remote_branch = branch.Branch.open(self.get_url('tree'))
2188
self.addCleanup(remote_branch.lock_write().unlock)
2189
self.assertSaveHook(remote_branch._get_config())
2191
def test_save_hook_remote_bzrdir(self):
2192
remote_branch = branch.Branch.open(self.get_url('tree'))
2193
self.addCleanup(remote_branch.lock_write().unlock)
2194
remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2195
self.assertSaveHook(remote_bzrdir._get_config())
2198
class TestOption(tests.TestCase):
2200
def test_default_value(self):
2201
opt = config.Option('foo', default='bar')
2202
self.assertEquals('bar', opt.get_default())
2205
class TestOptionRegistry(tests.TestCase):
2208
super(TestOptionRegistry, self).setUp()
2209
# Always start with an empty registry
2210
self.overrideAttr(config, 'option_registry', registry.Registry())
2211
self.registry = config.option_registry
2213
def test_register(self):
2214
opt = config.Option('foo')
2215
self.registry.register('foo', opt)
2216
self.assertIs(opt, self.registry.get('foo'))
2218
lazy_option = config.Option('lazy_foo')
2220
def test_register_lazy(self):
2221
self.registry.register_lazy('foo', self.__module__,
2222
'TestOptionRegistry.lazy_option')
2223
self.assertIs(self.lazy_option, self.registry.get('foo'))
2225
def test_registered_help(self):
2226
opt = config.Option('foo')
2227
self.registry.register('foo', opt, help='A simple option')
2228
self.assertEquals('A simple option', self.registry.get_help('foo'))
2231
class TestRegisteredOptions(tests.TestCase):
2232
"""All registered options should verify some constraints."""
2234
scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2235
in config.option_registry.iteritems()]
2238
super(TestRegisteredOptions, self).setUp()
2239
self.registry = config.option_registry
2241
def test_proper_name(self):
2242
# An option should be registered under its own name, this can't be
2243
# checked at registration time for the lazy ones.
2244
self.assertEquals(self.option_name, self.option.name)
2246
def test_help_is_set(self):
2247
option_help = self.registry.get_help(self.option_name)
2248
self.assertNotEquals(None, option_help)
2249
# Come on, think about the user, he really wants to know whst the
2251
self.assertNotEquals('', option_help)
2254
class TestSection(tests.TestCase):
2256
# FIXME: Parametrize so that all sections produced by Stores run these
2257
# tests -- vila 2011-04-01
2259
def test_get_a_value(self):
2260
a_dict = dict(foo='bar')
2261
section = config.Section('myID', a_dict)
2262
self.assertEquals('bar', section.get('foo'))
2264
def test_get_unknown_option(self):
2266
section = config.Section(None, a_dict)
2267
self.assertEquals('out of thin air',
2268
section.get('foo', 'out of thin air'))
2270
def test_options_is_shared(self):
2272
section = config.Section(None, a_dict)
2273
self.assertIs(a_dict, section.options)
2276
class TestMutableSection(tests.TestCase):
2278
# FIXME: Parametrize so that all sections (including os.environ and the
2279
# ones produced by Stores) run these tests -- vila 2011-04-01
2282
a_dict = dict(foo='bar')
2283
section = config.MutableSection('myID', a_dict)
2284
section.set('foo', 'new_value')
2285
self.assertEquals('new_value', section.get('foo'))
2286
# The change appears in the shared section
2287
self.assertEquals('new_value', a_dict.get('foo'))
2288
# We keep track of the change
2289
self.assertTrue('foo' in section.orig)
2290
self.assertEquals('bar', section.orig.get('foo'))
2292
def test_set_preserve_original_once(self):
2293
a_dict = dict(foo='bar')
2294
section = config.MutableSection('myID', a_dict)
2295
section.set('foo', 'first_value')
2296
section.set('foo', 'second_value')
2297
# We keep track of the original value
2298
self.assertTrue('foo' in section.orig)
2299
self.assertEquals('bar', section.orig.get('foo'))
2301
def test_remove(self):
2302
a_dict = dict(foo='bar')
2303
section = config.MutableSection('myID', a_dict)
2304
section.remove('foo')
2305
# We get None for unknown options via the default value
2306
self.assertEquals(None, section.get('foo'))
2307
# Or we just get the default value
2308
self.assertEquals('unknown', section.get('foo', 'unknown'))
2309
self.assertFalse('foo' in section.options)
2310
# We keep track of the deletion
2311
self.assertTrue('foo' in section.orig)
2312
self.assertEquals('bar', section.orig.get('foo'))
2314
def test_remove_new_option(self):
2316
section = config.MutableSection('myID', a_dict)
2317
section.set('foo', 'bar')
2318
section.remove('foo')
2319
self.assertFalse('foo' in section.options)
2320
# The option didn't exist initially so it we need to keep track of it
2321
# with a special value
2322
self.assertTrue('foo' in section.orig)
2323
self.assertEquals(config._NewlyCreatedOption, section.orig['foo'])
2326
class TestStore(tests.TestCaseWithTransport):
2328
def assertSectionContent(self, expected, section):
2329
"""Assert that some options have the proper values in a section."""
2330
expected_name, expected_options = expected
2331
self.assertEquals(expected_name, section.id)
2334
dict([(k, section.get(k)) for k in expected_options.keys()]))
2337
class TestReadonlyStore(TestStore):
2339
scenarios = [(key, {'get_store': builder}) for key, builder
2340
in config.test_store_builder_registry.iteritems()]
2343
super(TestReadonlyStore, self).setUp()
2345
def test_building_delays_load(self):
2346
store = self.get_store(self)
2347
self.assertEquals(False, store.is_loaded())
2348
store._load_from_string('')
2349
self.assertEquals(True, store.is_loaded())
2351
def test_get_no_sections_for_empty(self):
2352
store = self.get_store(self)
2353
store._load_from_string('')
2354
self.assertEquals([], list(store.get_sections()))
2356
def test_get_default_section(self):
2357
store = self.get_store(self)
2358
store._load_from_string('foo=bar')
2359
sections = list(store.get_sections())
2360
self.assertLength(1, sections)
2361
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2363
def test_get_named_section(self):
2364
store = self.get_store(self)
2365
store._load_from_string('[baz]\nfoo=bar')
2366
sections = list(store.get_sections())
2367
self.assertLength(1, sections)
2368
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2370
def test_load_from_string_fails_for_non_empty_store(self):
2371
store = self.get_store(self)
2372
store._load_from_string('foo=bar')
2373
self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2376
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2377
"""Simulate loading a config store without content of various encodings.
2379
All files produced by bzr are in utf8 content.
2381
Users may modify them manually and end up with a file that can't be
2382
loaded. We need to issue proper error messages in this case.
2385
invalid_utf8_char = '\xff'
2387
def test_load_utf8(self):
2388
"""Ensure we can load an utf8-encoded file."""
2389
t = self.get_transport()
2390
# From http://pad.lv/799212
2391
unicode_user = u'b\N{Euro Sign}ar'
2392
unicode_content = u'user=%s' % (unicode_user,)
2393
utf8_content = unicode_content.encode('utf8')
2394
# Store the raw content in the config file
2395
t.put_bytes('foo.conf', utf8_content)
2396
store = config.IniFileStore(t, 'foo.conf')
2398
stack = config.Stack([store.get_sections], store)
2399
self.assertEquals(unicode_user, stack.get('user'))
2401
def test_load_non_ascii(self):
2402
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2403
t = self.get_transport()
2404
t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2405
store = config.IniFileStore(t, 'foo.conf')
2406
self.assertRaises(errors.ConfigContentError, store.load)
2408
def test_load_erroneous_content(self):
2409
"""Ensure we display a proper error on content that can't be parsed."""
2410
t = self.get_transport()
2411
t.put_bytes('foo.conf', '[open_section\n')
2412
store = config.IniFileStore(t, 'foo.conf')
2413
self.assertRaises(errors.ParseConfigError, store.load)
2416
class TestIniConfigContent(tests.TestCaseWithTransport):
2417
"""Simulate loading a IniBasedConfig without content of various encodings.
2419
All files produced by bzr are in utf8 content.
2421
Users may modify them manually and end up with a file that can't be
2422
loaded. We need to issue proper error messages in this case.
2425
invalid_utf8_char = '\xff'
2427
def test_load_utf8(self):
2428
"""Ensure we can load an utf8-encoded file."""
2429
# From http://pad.lv/799212
2430
unicode_user = u'b\N{Euro Sign}ar'
2431
unicode_content = u'user=%s' % (unicode_user,)
2432
utf8_content = unicode_content.encode('utf8')
2433
# Store the raw content in the config file
2434
with open('foo.conf', 'wb') as f:
2435
f.write(utf8_content)
2436
conf = config.IniBasedConfig(file_name='foo.conf')
2437
self.assertEquals(unicode_user, conf.get_user_option('user'))
2439
def test_load_badly_encoded_content(self):
2440
"""Ensure we display a proper error on non-ascii, non utf-8 content."""
2441
with open('foo.conf', 'wb') as f:
2442
f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2443
conf = config.IniBasedConfig(file_name='foo.conf')
2444
self.assertRaises(errors.ConfigContentError, conf._get_parser)
2446
def test_load_erroneous_content(self):
2447
"""Ensure we display a proper error on content that can't be parsed."""
2448
with open('foo.conf', 'wb') as f:
2449
f.write('[open_section\n')
2450
conf = config.IniBasedConfig(file_name='foo.conf')
2451
self.assertRaises(errors.ParseConfigError, conf._get_parser)
2454
class TestMutableStore(TestStore):
2456
scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2457
in config.test_store_builder_registry.iteritems()]
2460
super(TestMutableStore, self).setUp()
2461
self.transport = self.get_transport()
2463
def has_store(self, store):
2464
store_basename = urlutils.relative_url(self.transport.external_url(),
2465
store.external_url())
2466
return self.transport.has(store_basename)
2468
def test_save_empty_creates_no_file(self):
2469
# FIXME: There should be a better way than relying on the test
2470
# parametrization to identify branch.conf -- vila 2011-0526
2471
if self.store_id in ('branch', 'remote_branch'):
2472
raise tests.TestNotApplicable(
2473
'branch.conf is *always* created when a branch is initialized')
2474
store = self.get_store(self)
2476
self.assertEquals(False, self.has_store(store))
2478
def test_save_emptied_succeeds(self):
2479
store = self.get_store(self)
2480
store._load_from_string('foo=bar\n')
2481
section = store.get_mutable_section(None)
2482
section.remove('foo')
2484
self.assertEquals(True, self.has_store(store))
2485
modified_store = self.get_store(self)
2486
sections = list(modified_store.get_sections())
2487
self.assertLength(0, sections)
2489
def test_save_with_content_succeeds(self):
2490
# FIXME: There should be a better way than relying on the test
2491
# parametrization to identify branch.conf -- vila 2011-0526
2492
if self.store_id in ('branch', 'remote_branch'):
2493
raise tests.TestNotApplicable(
2494
'branch.conf is *always* created when a branch is initialized')
2495
store = self.get_store(self)
2496
store._load_from_string('foo=bar\n')
2497
self.assertEquals(False, self.has_store(store))
2499
self.assertEquals(True, self.has_store(store))
2500
modified_store = self.get_store(self)
2501
sections = list(modified_store.get_sections())
2502
self.assertLength(1, sections)
2503
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2505
def test_set_option_in_empty_store(self):
2506
store = self.get_store(self)
2507
section = store.get_mutable_section(None)
2508
section.set('foo', 'bar')
2510
modified_store = self.get_store(self)
2511
sections = list(modified_store.get_sections())
2512
self.assertLength(1, sections)
2513
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2515
def test_set_option_in_default_section(self):
2516
store = self.get_store(self)
2517
store._load_from_string('')
2518
section = store.get_mutable_section(None)
2519
section.set('foo', 'bar')
2521
modified_store = self.get_store(self)
2522
sections = list(modified_store.get_sections())
2523
self.assertLength(1, sections)
2524
self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2526
def test_set_option_in_named_section(self):
2527
store = self.get_store(self)
2528
store._load_from_string('')
2529
section = store.get_mutable_section('baz')
2530
section.set('foo', 'bar')
2532
modified_store = self.get_store(self)
2533
sections = list(modified_store.get_sections())
2534
self.assertLength(1, sections)
2535
self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2537
def test_load_hook(self):
2538
# We first needs to ensure that the store exists
2539
store = self.get_store(self)
2540
section = store.get_mutable_section('baz')
2541
section.set('foo', 'bar')
2543
# Now we can try to load it
2544
store = self.get_store(self)
2548
config.ConfigHooks.install_named_hook('load', hook, None)
2549
self.assertLength(0, calls)
2551
self.assertLength(1, calls)
2552
self.assertEquals((store,), calls[0])
2554
def test_save_hook(self):
2558
config.ConfigHooks.install_named_hook('save', hook, None)
2559
self.assertLength(0, calls)
2560
store = self.get_store(self)
2561
section = store.get_mutable_section('baz')
2562
section.set('foo', 'bar')
2564
self.assertLength(1, calls)
2565
self.assertEquals((store,), calls[0])
2568
class TestIniFileStore(TestStore):
2570
def test_loading_unknown_file_fails(self):
2571
store = config.IniFileStore(self.get_transport(), 'I-do-not-exist')
2572
self.assertRaises(errors.NoSuchFile, store.load)
2574
def test_invalid_content(self):
2575
store = config.IniFileStore(self.get_transport(), 'foo.conf', )
2576
self.assertEquals(False, store.is_loaded())
2577
exc = self.assertRaises(
2578
errors.ParseConfigError, store._load_from_string,
2579
'this is invalid !')
2580
self.assertEndsWith(exc.filename, 'foo.conf')
2581
# And the load failed
2582
self.assertEquals(False, store.is_loaded())
2584
def test_get_embedded_sections(self):
2585
# A more complicated example (which also shows that section names and
2586
# option names share the same name space...)
2587
# FIXME: This should be fixed by forbidding dicts as values ?
2588
# -- vila 2011-04-05
2589
store = config.IniFileStore(self.get_transport(), 'foo.conf', )
2590
store._load_from_string('''
2594
foo_in_DEFAULT=foo_DEFAULT
2602
sections = list(store.get_sections())
2603
self.assertLength(4, sections)
2604
# The default section has no name.
2605
# List values are provided as lists
2606
self.assertSectionContent((None, {'foo': 'bar', 'l': ['1', '2']}),
2608
self.assertSectionContent(
2609
('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2610
self.assertSectionContent(
2611
('bar', {'foo_in_bar': 'barbar'}), sections[2])
2612
# sub sections are provided as embedded dicts.
2613
self.assertSectionContent(
2614
('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2618
class TestLockableIniFileStore(TestStore):
2620
def test_create_store_in_created_dir(self):
2621
self.assertPathDoesNotExist('dir')
2622
t = self.get_transport('dir/subdir')
2623
store = config.LockableIniFileStore(t, 'foo.conf')
2624
store.get_mutable_section(None).set('foo', 'bar')
2626
self.assertPathExists('dir/subdir')
2629
class TestConcurrentStoreUpdates(TestStore):
2630
"""Test that Stores properly handle conccurent updates.
2632
New Store implementation may fail some of these tests but until such
2633
implementations exist it's hard to properly filter them from the scenarios
2634
applied here. If you encounter such a case, contact the bzr devs.
2637
scenarios = [(key, {'get_stack': builder}) for key, builder
2638
in config.test_stack_builder_registry.iteritems()]
2641
super(TestConcurrentStoreUpdates, self).setUp()
2642
self._content = 'one=1\ntwo=2\n'
2643
self.stack = self.get_stack(self)
2644
if not isinstance(self.stack, config._CompatibleStack):
2645
raise tests.TestNotApplicable(
2646
'%s is not meant to be compatible with the old config design'
2648
self.stack.store._load_from_string(self._content)
2650
self.stack.store.save()
2652
def test_simple_read_access(self):
2653
self.assertEquals('1', self.stack.get('one'))
2655
def test_simple_write_access(self):
2656
self.stack.set('one', 'one')
2657
self.assertEquals('one', self.stack.get('one'))
2659
def test_listen_to_the_last_speaker(self):
2661
c2 = self.get_stack(self)
2662
c1.set('one', 'ONE')
2663
c2.set('two', 'TWO')
2664
self.assertEquals('ONE', c1.get('one'))
2665
self.assertEquals('TWO', c2.get('two'))
2666
# The second update respect the first one
2667
self.assertEquals('ONE', c2.get('one'))
2669
def test_last_speaker_wins(self):
2670
# If the same config is not shared, the same variable modified twice
2671
# can only see a single result.
2673
c2 = self.get_stack(self)
2676
self.assertEquals('c2', c2.get('one'))
2677
# The first modification is still available until another refresh
2679
self.assertEquals('c1', c1.get('one'))
2680
c1.set('two', 'done')
2681
self.assertEquals('c2', c1.get('one'))
2683
def test_writes_are_serialized(self):
2685
c2 = self.get_stack(self)
2687
# We spawn a thread that will pause *during* the config saving.
2688
before_writing = threading.Event()
2689
after_writing = threading.Event()
2690
writing_done = threading.Event()
2691
c1_save_without_locking_orig = c1.store.save_without_locking
2692
def c1_save_without_locking():
2693
before_writing.set()
2694
c1_save_without_locking_orig()
2695
# The lock is held. We wait for the main thread to decide when to
2697
after_writing.wait()
2698
c1.store.save_without_locking = c1_save_without_locking
2702
t1 = threading.Thread(target=c1_set)
2703
# Collect the thread after the test
2704
self.addCleanup(t1.join)
2705
# Be ready to unblock the thread if the test goes wrong
2706
self.addCleanup(after_writing.set)
2708
before_writing.wait()
2709
self.assertRaises(errors.LockContention,
2710
c2.set, 'one', 'c2')
2711
self.assertEquals('c1', c1.get('one'))
2712
# Let the lock be released
2716
self.assertEquals('c2', c2.get('one'))
2718
def test_read_while_writing(self):
2720
# We spawn a thread that will pause *during* the write
2721
ready_to_write = threading.Event()
2722
do_writing = threading.Event()
2723
writing_done = threading.Event()
2724
# We override the _save implementation so we know the store is locked
2725
c1_save_without_locking_orig = c1.store.save_without_locking
2726
def c1_save_without_locking():
2727
ready_to_write.set()
2728
# The lock is held. We wait for the main thread to decide when to
2731
c1_save_without_locking_orig()
2733
c1.store.save_without_locking = c1_save_without_locking
2736
t1 = threading.Thread(target=c1_set)
2737
# Collect the thread after the test
2738
self.addCleanup(t1.join)
2739
# Be ready to unblock the thread if the test goes wrong
2740
self.addCleanup(do_writing.set)
2742
# Ensure the thread is ready to write
2743
ready_to_write.wait()
2744
self.assertEquals('c1', c1.get('one'))
2745
# If we read during the write, we get the old value
2746
c2 = self.get_stack(self)
2747
self.assertEquals('1', c2.get('one'))
2748
# Let the writing occur and ensure it occurred
2751
# Now we get the updated value
2752
c3 = self.get_stack(self)
2753
self.assertEquals('c1', c3.get('one'))
2755
# FIXME: It may be worth looking into removing the lock dir when it's not
2756
# needed anymore and look at possible fallouts for concurrent lockers. This
2757
# will matter if/when we use config files outside of bazaar directories
2758
# (.bazaar or .bzr) -- vila 20110-04-11
2761
class TestSectionMatcher(TestStore):
2763
scenarios = [('location', {'matcher': config.LocationMatcher})]
2765
def get_store(self, file_name):
2766
return config.IniFileStore(self.get_readonly_transport(), file_name)
2768
def test_no_matches_for_empty_stores(self):
2769
store = self.get_store('foo.conf')
2770
store._load_from_string('')
2771
matcher = self.matcher(store, '/bar')
2772
self.assertEquals([], list(matcher.get_sections()))
2774
def test_build_doesnt_load_store(self):
2775
store = self.get_store('foo.conf')
2776
matcher = self.matcher(store, '/bar')
2777
self.assertFalse(store.is_loaded())
2780
class TestLocationSection(tests.TestCase):
2782
def get_section(self, options, extra_path):
2783
section = config.Section('foo', options)
2784
# We don't care about the length so we use '0'
2785
return config.LocationSection(section, 0, extra_path)
2787
def test_simple_option(self):
2788
section = self.get_section({'foo': 'bar'}, '')
2789
self.assertEquals('bar', section.get('foo'))
2791
def test_option_with_extra_path(self):
2792
section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
2794
self.assertEquals('bar/baz', section.get('foo'))
2796
def test_invalid_policy(self):
2797
section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
2799
# invalid policies are ignored
2800
self.assertEquals('bar', section.get('foo'))
2803
class TestLocationMatcher(TestStore):
2805
def get_store(self, file_name):
2806
return config.IniFileStore(self.get_readonly_transport(), file_name)
2808
def test_unrelated_section_excluded(self):
2809
store = self.get_store('foo.conf')
2810
store._load_from_string('''
2818
section=/foo/bar/baz
2822
self.assertEquals(['/foo', '/foo/baz', '/foo/bar', '/foo/bar/baz',
2824
[section.id for section in store.get_sections()])
2825
matcher = config.LocationMatcher(store, '/foo/bar/quux')
2826
sections = list(matcher.get_sections())
2827
self.assertEquals([3, 2],
2828
[section.length for section in sections])
2829
self.assertEquals(['/foo/bar', '/foo'],
2830
[section.id for section in sections])
2831
self.assertEquals(['quux', 'bar/quux'],
2832
[section.extra_path for section in sections])
2834
def test_more_specific_sections_first(self):
2835
store = self.get_store('foo.conf')
2836
store._load_from_string('''
2842
self.assertEquals(['/foo', '/foo/bar'],
2843
[section.id for section in store.get_sections()])
2844
matcher = config.LocationMatcher(store, '/foo/bar/baz')
2845
sections = list(matcher.get_sections())
2846
self.assertEquals([3, 2],
2847
[section.length for section in sections])
2848
self.assertEquals(['/foo/bar', '/foo'],
2849
[section.id for section in sections])
2850
self.assertEquals(['baz', 'bar/baz'],
2851
[section.extra_path for section in sections])
2853
def test_appendpath_in_no_name_section(self):
2854
# It's a bit weird to allow appendpath in a no-name section, but
2855
# someone may found a use for it
2856
store = self.get_store('foo.conf')
2857
store._load_from_string('''
2859
foo:policy = appendpath
2861
matcher = config.LocationMatcher(store, 'dir/subdir')
2862
sections = list(matcher.get_sections())
2863
self.assertLength(1, sections)
2864
self.assertEquals('bar/dir/subdir', sections[0].get('foo'))
2866
def test_file_urls_are_normalized(self):
2867
store = self.get_store('foo.conf')
2868
if sys.platform == 'win32':
2869
expected_url = 'file:///C:/dir/subdir'
2870
expected_location = 'C:/dir/subdir'
2872
expected_url = 'file:///dir/subdir'
2873
expected_location = '/dir/subdir'
2874
matcher = config.LocationMatcher(store, expected_url)
2875
self.assertEquals(expected_location, matcher.location)
2878
class TestStackGet(tests.TestCase):
2880
# FIXME: This should be parametrized for all known Stack or dedicated
2881
# paramerized tests created to avoid bloating -- vila 2011-03-31
2883
def test_single_config_get(self):
2884
conf = dict(foo='bar')
2885
conf_stack = config.Stack([conf])
2886
self.assertEquals('bar', conf_stack.get('foo'))
2888
def test_get_with_registered_default_value(self):
2889
conf_stack = config.Stack([dict()])
2890
opt = config.Option('foo', default='bar')
2891
self.overrideAttr(config, 'option_registry', registry.Registry())
2892
config.option_registry.register('foo', opt)
2893
self.assertEquals('bar', conf_stack.get('foo'))
2895
def test_get_without_registered_default_value(self):
2896
conf_stack = config.Stack([dict()])
2897
opt = config.Option('foo')
2898
self.overrideAttr(config, 'option_registry', registry.Registry())
2899
config.option_registry.register('foo', opt)
2900
self.assertEquals(None, conf_stack.get('foo'))
2902
def test_get_without_default_value_for_not_registered(self):
2903
conf_stack = config.Stack([dict()])
2904
opt = config.Option('foo')
2905
self.overrideAttr(config, 'option_registry', registry.Registry())
2906
self.assertEquals(None, conf_stack.get('foo'))
2908
def test_get_first_definition(self):
2909
conf1 = dict(foo='bar')
2910
conf2 = dict(foo='baz')
2911
conf_stack = config.Stack([conf1, conf2])
2912
self.assertEquals('bar', conf_stack.get('foo'))
2914
def test_get_embedded_definition(self):
2915
conf1 = dict(yy='12')
2916
conf2 = config.Stack([dict(xx='42'), dict(foo='baz')])
2917
conf_stack = config.Stack([conf1, conf2])
2918
self.assertEquals('baz', conf_stack.get('foo'))
2920
def test_get_for_empty_section_callable(self):
2921
conf_stack = config.Stack([lambda : []])
2922
self.assertEquals(None, conf_stack.get('foo'))
2924
def test_get_for_broken_callable(self):
2925
# Trying to use and invalid callable raises an exception on first use
2926
conf_stack = config.Stack([lambda : object()])
2927
self.assertRaises(TypeError, conf_stack.get, 'foo')
2930
class TestStackWithTransport(tests.TestCaseWithTransport):
2932
scenarios = [(key, {'get_stack': builder}) for key, builder
2933
in config.test_stack_builder_registry.iteritems()]
2936
class TestConcreteStacks(TestStackWithTransport):
2938
def test_build_stack(self):
2939
# Just a smoke test to help debug builders
2940
stack = self.get_stack(self)
2943
class TestStackGet(TestStackWithTransport):
2946
super(TestStackGet, self).setUp()
2947
self.conf = self.get_stack(self)
2949
def test_get_for_empty_stack(self):
2950
self.assertEquals(None, self.conf.get('foo'))
2952
def test_get_hook(self):
2953
self.conf.store._load_from_string('foo=bar')
2957
config.ConfigHooks.install_named_hook('get', hook, None)
2958
self.assertLength(0, calls)
2959
value = self.conf.get('foo')
2960
self.assertEquals('bar', value)
2961
self.assertLength(1, calls)
2962
self.assertEquals((self.conf, 'foo', 'bar'), calls[0])
2965
class TestStackGetWithConverter(TestStackGet):
2968
super(TestStackGetWithConverter, self).setUp()
2969
self.overrideAttr(config, 'option_registry', registry.Registry())
2970
self.registry = config.option_registry
2972
def register_bool_option(self, name, default):
2973
b = config.Option(name, default=default,
2974
from_unicode=config.bool_from_store)
2975
self.registry.register(b.name, b, help='A boolean.')
2977
def test_get_with_bool_not_defined_default_true(self):
2978
self.register_bool_option('foo', True)
2979
self.assertEquals(True, self.conf.get('foo'))
2981
def test_get_with_bool_not_defined_default_false(self):
2982
self.register_bool_option('foo', False)
2983
self.assertEquals(False, self.conf.get('foo'))
2985
def test_get_with_bool_converter_not_default(self):
2986
self.register_bool_option('foo', False)
2987
self.conf.store._load_from_string('foo=yes')
2988
self.assertEquals(True, self.conf.get('foo'))
2990
def test_get_with_bool_converter_invalid_string(self):
2991
self.register_bool_option('foo', False)
2992
self.conf.store._load_from_string('foo=not-a-boolean')
2993
self.assertEquals(False, self.conf.get('foo'))
2995
def test_get_with_bool_converter_invalid_list(self):
2996
self.register_bool_option('foo', False)
2997
self.conf.store._load_from_string('foo=not,a,boolean')
2998
self.assertEquals(False, self.conf.get('foo'))
3000
class TestStackSet(TestStackWithTransport):
3002
def test_simple_set(self):
3003
conf = self.get_stack(self)
3004
conf.store._load_from_string('foo=bar')
3005
self.assertEquals('bar', conf.get('foo'))
3006
conf.set('foo', 'baz')
3007
# Did we get it back ?
3008
self.assertEquals('baz', conf.get('foo'))
3010
def test_set_creates_a_new_section(self):
3011
conf = self.get_stack(self)
3012
conf.set('foo', 'baz')
3013
self.assertEquals, 'baz', conf.get('foo')
3015
def test_set_hook(self):
3019
config.ConfigHooks.install_named_hook('set', hook, None)
3020
self.assertLength(0, calls)
3021
conf = self.get_stack(self)
3022
conf.set('foo', 'bar')
3023
self.assertLength(1, calls)
3024
self.assertEquals((conf, 'foo', 'bar'), calls[0])
3027
class TestStackRemove(TestStackWithTransport):
3029
def test_remove_existing(self):
3030
conf = self.get_stack(self)
3031
conf.store._load_from_string('foo=bar')
3032
self.assertEquals('bar', conf.get('foo'))
3034
# Did we get it back ?
3035
self.assertEquals(None, conf.get('foo'))
3037
def test_remove_unknown(self):
3038
conf = self.get_stack(self)
3039
self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
3041
def test_remove_hook(self):
3045
config.ConfigHooks.install_named_hook('remove', hook, None)
3046
self.assertLength(0, calls)
3047
conf = self.get_stack(self)
3048
conf.store._load_from_string('foo=bar')
3050
self.assertLength(1, calls)
3051
self.assertEquals((conf, 'foo'), calls[0])
3054
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
3057
super(TestConfigGetOptions, self).setUp()
3058
create_configs(self)
3060
def test_no_variable(self):
3061
# Using branch should query branch, locations and bazaar
3062
self.assertOptions([], self.branch_config)
3064
def test_option_in_bazaar(self):
3065
self.bazaar_config.set_user_option('file', 'bazaar')
3066
self.assertOptions([('file', 'bazaar', 'DEFAULT', 'bazaar')],
3069
def test_option_in_locations(self):
3070
self.locations_config.set_user_option('file', 'locations')
3072
[('file', 'locations', self.tree.basedir, 'locations')],
3073
self.locations_config)
3075
def test_option_in_branch(self):
3076
self.branch_config.set_user_option('file', 'branch')
3077
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch')],
3080
def test_option_in_bazaar_and_branch(self):
3081
self.bazaar_config.set_user_option('file', 'bazaar')
3082
self.branch_config.set_user_option('file', 'branch')
3083
self.assertOptions([('file', 'branch', 'DEFAULT', 'branch'),
3084
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
3087
def test_option_in_branch_and_locations(self):
3088
# Hmm, locations override branch :-/
3089
self.locations_config.set_user_option('file', 'locations')
3090
self.branch_config.set_user_option('file', 'branch')
3092
[('file', 'locations', self.tree.basedir, 'locations'),
3093
('file', 'branch', 'DEFAULT', 'branch'),],
3096
def test_option_in_bazaar_locations_and_branch(self):
3097
self.bazaar_config.set_user_option('file', 'bazaar')
3098
self.locations_config.set_user_option('file', 'locations')
3099
self.branch_config.set_user_option('file', 'branch')
3101
[('file', 'locations', self.tree.basedir, 'locations'),
3102
('file', 'branch', 'DEFAULT', 'branch'),
3103
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
3107
class TestConfigRemoveOption(tests.TestCaseWithTransport, TestOptionsMixin):
3110
super(TestConfigRemoveOption, self).setUp()
3111
create_configs_with_file_option(self)
3113
def test_remove_in_locations(self):
3114
self.locations_config.remove_user_option('file', self.tree.basedir)
3116
[('file', 'branch', 'DEFAULT', 'branch'),
3117
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
3120
def test_remove_in_branch(self):
3121
self.branch_config.remove_user_option('file')
3123
[('file', 'locations', self.tree.basedir, 'locations'),
3124
('file', 'bazaar', 'DEFAULT', 'bazaar'),],
3127
def test_remove_in_bazaar(self):
3128
self.bazaar_config.remove_user_option('file')
3130
[('file', 'locations', self.tree.basedir, 'locations'),
3131
('file', 'branch', 'DEFAULT', 'branch'),],
3135
class TestConfigGetSections(tests.TestCaseWithTransport):
3138
super(TestConfigGetSections, self).setUp()
3139
create_configs(self)
3141
def assertSectionNames(self, expected, conf, name=None):
3142
"""Check which sections are returned for a given config.
3144
If fallback configurations exist their sections can be included.
3146
:param expected: A list of section names.
3148
:param conf: The configuration that will be queried.
3150
:param name: An optional section name that will be passed to
3153
sections = list(conf._get_sections(name))
3154
self.assertLength(len(expected), sections)
3155
self.assertEqual(expected, [name for name, _, _ in sections])
3157
def test_bazaar_default_section(self):
3158
self.assertSectionNames(['DEFAULT'], self.bazaar_config)
3160
def test_locations_default_section(self):
3161
# No sections are defined in an empty file
3162
self.assertSectionNames([], self.locations_config)
3164
def test_locations_named_section(self):
3165
self.locations_config.set_user_option('file', 'locations')
3166
self.assertSectionNames([self.tree.basedir], self.locations_config)
3168
def test_locations_matching_sections(self):
3169
loc_config = self.locations_config
3170
loc_config.set_user_option('file', 'locations')
3171
# We need to cheat a bit here to create an option in sections above and
3172
# below the 'location' one.
3173
parser = loc_config._get_parser()
3174
# locations.cong deals with '/' ignoring native os.sep
3175
location_names = self.tree.basedir.split('/')
3176
parent = '/'.join(location_names[:-1])
3177
child = '/'.join(location_names + ['child'])
3179
parser[parent]['file'] = 'parent'
3181
parser[child]['file'] = 'child'
3182
self.assertSectionNames([self.tree.basedir, parent], loc_config)
3184
def test_branch_data_default_section(self):
3185
self.assertSectionNames([None],
3186
self.branch_config._get_branch_data_config())
3188
def test_branch_default_sections(self):
3189
# No sections are defined in an empty locations file
3190
self.assertSectionNames([None, 'DEFAULT'],
3192
# Unless we define an option
3193
self.branch_config._get_location_config().set_user_option(
3194
'file', 'locations')
3195
self.assertSectionNames([self.tree.basedir, None, 'DEFAULT'],
3198
def test_bazaar_named_section(self):
3199
# We need to cheat as the API doesn't give direct access to sections
3200
# other than DEFAULT.
3201
self.bazaar_config.set_alias('bazaar', 'bzr')
3202
self.assertSectionNames(['ALIASES'], self.bazaar_config, 'ALIASES')
3205
class TestAuthenticationConfigFile(tests.TestCase):
3206
"""Test the authentication.conf file matching"""
3208
def _got_user_passwd(self, expected_user, expected_password,
3209
config, *args, **kwargs):
3210
credentials = config.get_credentials(*args, **kwargs)
3211
if credentials is None:
3215
user = credentials['user']
3216
password = credentials['password']
3217
self.assertEquals(expected_user, user)
3218
self.assertEquals(expected_password, password)
3220
def test_empty_config(self):
3221
conf = config.AuthenticationConfig(_file=StringIO())
3222
self.assertEquals({}, conf._get_config())
3223
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
3225
def test_non_utf8_config(self):
3226
conf = config.AuthenticationConfig(_file=StringIO(
3228
self.assertRaises(errors.ConfigContentError, conf._get_config)
3230
def test_missing_auth_section_header(self):
3231
conf = config.AuthenticationConfig(_file=StringIO('foo = bar'))
3232
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
3234
def test_auth_section_header_not_closed(self):
3235
conf = config.AuthenticationConfig(_file=StringIO('[DEF'))
3236
self.assertRaises(errors.ParseConfigError, conf._get_config)
3238
def test_auth_value_not_boolean(self):
3239
conf = config.AuthenticationConfig(_file=StringIO(
3243
verify_certificates=askme # Error: Not a boolean
3245
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
3247
def test_auth_value_not_int(self):
3248
conf = config.AuthenticationConfig(_file=StringIO(
3252
port=port # Error: Not an int
3254
self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
3256
def test_unknown_password_encoding(self):
3257
conf = config.AuthenticationConfig(_file=StringIO(
3261
password_encoding=unknown
3263
self.assertRaises(ValueError, conf.get_password,
3264
'ftp', 'foo.net', 'joe')
3266
def test_credentials_for_scheme_host(self):
3267
conf = config.AuthenticationConfig(_file=StringIO(
3268
"""# Identity on foo.net
3273
password=secret-pass
3276
self._got_user_passwd('joe', 'secret-pass', conf, 'ftp', 'foo.net')
3278
self._got_user_passwd(None, None, conf, 'http', 'foo.net')
3280
self._got_user_passwd(None, None, conf, 'ftp', 'bar.net')
3282
def test_credentials_for_host_port(self):
3283
conf = config.AuthenticationConfig(_file=StringIO(
3284
"""# Identity on foo.net
3290
password=secret-pass
3293
self._got_user_passwd('joe', 'secret-pass',
3294
conf, 'ftp', 'foo.net', port=10021)
3296
self._got_user_passwd(None, None, conf, 'ftp', 'foo.net')
3298
def test_for_matching_host(self):
3299
conf = config.AuthenticationConfig(_file=StringIO(
3300
"""# Identity on foo.net
3306
[sourceforge domain]
3313
self._got_user_passwd('georges', 'bendover',
3314
conf, 'bzr', 'foo.bzr.sf.net')
3316
self._got_user_passwd(None, None,
3317
conf, 'bzr', 'bbzr.sf.net')
3319
def test_for_matching_host_None(self):
3320
conf = config.AuthenticationConfig(_file=StringIO(
3321
"""# Identity on foo.net
3331
self._got_user_passwd('joe', 'joepass',
3332
conf, 'bzr', 'quux.net')
3333
# no host but different scheme
3334
self._got_user_passwd('georges', 'bendover',
3335
conf, 'ftp', 'quux.net')
3337
def test_credentials_for_path(self):
3338
conf = config.AuthenticationConfig(_file=StringIO(
3354
self._got_user_passwd(None, None,
3355
conf, 'http', host='bar.org', path='/dir3')
3357
self._got_user_passwd('georges', 'bendover',
3358
conf, 'http', host='bar.org', path='/dir2')
3360
self._got_user_passwd('jim', 'jimpass',
3361
conf, 'http', host='bar.org',path='/dir1/subdir')
3363
def test_credentials_for_user(self):
3364
conf = config.AuthenticationConfig(_file=StringIO(
3373
self._got_user_passwd('jim', 'jimpass',
3374
conf, 'http', 'bar.org')
3376
self._got_user_passwd('jim', 'jimpass',
3377
conf, 'http', 'bar.org', user='jim')
3378
# Don't get a different user if one is specified
3379
self._got_user_passwd(None, None,
3380
conf, 'http', 'bar.org', user='georges')
3382
def test_credentials_for_user_without_password(self):
3383
conf = config.AuthenticationConfig(_file=StringIO(
3390
# Get user but no password
3391
self._got_user_passwd('jim', None,
3392
conf, 'http', 'bar.org')
3394
def test_verify_certificates(self):
3395
conf = config.AuthenticationConfig(_file=StringIO(
3402
verify_certificates=False
3409
credentials = conf.get_credentials('https', 'bar.org')
3410
self.assertEquals(False, credentials.get('verify_certificates'))
3411
credentials = conf.get_credentials('https', 'foo.net')
3412
self.assertEquals(True, credentials.get('verify_certificates'))
3415
class TestAuthenticationStorage(tests.TestCaseInTempDir):
3417
def test_set_credentials(self):
3418
conf = config.AuthenticationConfig()
3419
conf.set_credentials('name', 'host', 'user', 'scheme', 'password',
3420
99, path='/foo', verify_certificates=False, realm='realm')
3421
credentials = conf.get_credentials(host='host', scheme='scheme',
3422
port=99, path='/foo',
3424
CREDENTIALS = {'name': 'name', 'user': 'user', 'password': 'password',
3425
'verify_certificates': False, 'scheme': 'scheme',
3426
'host': 'host', 'port': 99, 'path': '/foo',
3428
self.assertEqual(CREDENTIALS, credentials)
3429
credentials_from_disk = config.AuthenticationConfig().get_credentials(
3430
host='host', scheme='scheme', port=99, path='/foo', realm='realm')
3431
self.assertEqual(CREDENTIALS, credentials_from_disk)
3433
def test_reset_credentials_different_name(self):
3434
conf = config.AuthenticationConfig()
3435
conf.set_credentials('name', 'host', 'user', 'scheme', 'password'),
3436
conf.set_credentials('name2', 'host', 'user2', 'scheme', 'password'),
3437
self.assertIs(None, conf._get_config().get('name'))
3438
credentials = conf.get_credentials(host='host', scheme='scheme')
3439
CREDENTIALS = {'name': 'name2', 'user': 'user2', 'password':
3440
'password', 'verify_certificates': True,
3441
'scheme': 'scheme', 'host': 'host', 'port': None,
3442
'path': None, 'realm': None}
3443
self.assertEqual(CREDENTIALS, credentials)
3446
class TestAuthenticationConfig(tests.TestCase):
3447
"""Test AuthenticationConfig behaviour"""
3449
def _check_default_password_prompt(self, expected_prompt_format, scheme,
3450
host=None, port=None, realm=None,
3454
user, password = 'jim', 'precious'
3455
expected_prompt = expected_prompt_format % {
3456
'scheme': scheme, 'host': host, 'port': port,
3457
'user': user, 'realm': realm}
3459
stdout = tests.StringIOWrapper()
3460
stderr = tests.StringIOWrapper()
3461
ui.ui_factory = tests.TestUIFactory(stdin=password + '\n',
3462
stdout=stdout, stderr=stderr)
3463
# We use an empty conf so that the user is always prompted
3464
conf = config.AuthenticationConfig()
3465
self.assertEquals(password,
3466
conf.get_password(scheme, host, user, port=port,
3467
realm=realm, path=path))
3468
self.assertEquals(expected_prompt, stderr.getvalue())
3469
self.assertEquals('', stdout.getvalue())
3471
def _check_default_username_prompt(self, expected_prompt_format, scheme,
3472
host=None, port=None, realm=None,
3477
expected_prompt = expected_prompt_format % {
3478
'scheme': scheme, 'host': host, 'port': port,
3480
stdout = tests.StringIOWrapper()
3481
stderr = tests.StringIOWrapper()
3482
ui.ui_factory = tests.TestUIFactory(stdin=username+ '\n',
3483
stdout=stdout, stderr=stderr)
3484
# We use an empty conf so that the user is always prompted
3485
conf = config.AuthenticationConfig()
3486
self.assertEquals(username, conf.get_user(scheme, host, port=port,
3487
realm=realm, path=path, ask=True))
3488
self.assertEquals(expected_prompt, stderr.getvalue())
3489
self.assertEquals('', stdout.getvalue())
3491
def test_username_defaults_prompts(self):
3492
# HTTP prompts can't be tested here, see test_http.py
3493
self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
3494
self._check_default_username_prompt(
3495
u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
3496
self._check_default_username_prompt(
3497
u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
3499
def test_username_default_no_prompt(self):
3500
conf = config.AuthenticationConfig()
3501
self.assertEquals(None,
3502
conf.get_user('ftp', 'example.com'))
3503
self.assertEquals("explicitdefault",
3504
conf.get_user('ftp', 'example.com', default="explicitdefault"))
3506
def test_password_default_prompts(self):
3507
# HTTP prompts can't be tested here, see test_http.py
3508
self._check_default_password_prompt(
3509
u'FTP %(user)s@%(host)s password: ', 'ftp')
3510
self._check_default_password_prompt(
3511
u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
3512
self._check_default_password_prompt(
3513
u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
3514
# SMTP port handling is a bit special (it's handled if embedded in the
3516
# FIXME: should we: forbid that, extend it to other schemes, leave
3517
# things as they are that's fine thank you ?
3518
self._check_default_password_prompt(
3519
u'SMTP %(user)s@%(host)s password: ', 'smtp')
3520
self._check_default_password_prompt(
3521
u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
3522
self._check_default_password_prompt(
3523
u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
3525
def test_ssh_password_emits_warning(self):
3526
conf = config.AuthenticationConfig(_file=StringIO(
3534
entered_password = 'typed-by-hand'
3535
stdout = tests.StringIOWrapper()
3536
stderr = tests.StringIOWrapper()
3537
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n',
3538
stdout=stdout, stderr=stderr)
3540
# Since the password defined in the authentication config is ignored,
3541
# the user is prompted
3542
self.assertEquals(entered_password,
3543
conf.get_password('ssh', 'bar.org', user='jim'))
3544
self.assertContainsRe(
3546
'password ignored in section \[ssh with password\]')
3548
def test_ssh_without_password_doesnt_emit_warning(self):
3549
conf = config.AuthenticationConfig(_file=StringIO(
3556
entered_password = 'typed-by-hand'
3557
stdout = tests.StringIOWrapper()
3558
stderr = tests.StringIOWrapper()
3559
ui.ui_factory = tests.TestUIFactory(stdin=entered_password + '\n',
3563
# Since the password defined in the authentication config is ignored,
3564
# the user is prompted
3565
self.assertEquals(entered_password,
3566
conf.get_password('ssh', 'bar.org', user='jim'))
3567
# No warning shoud be emitted since there is no password. We are only
3569
self.assertNotContainsRe(
3571
'password ignored in section \[ssh with password\]')
3573
def test_uses_fallback_stores(self):
3574
self.overrideAttr(config, 'credential_store_registry',
3575
config.CredentialStoreRegistry())
3576
store = StubCredentialStore()
3577
store.add_credentials("http", "example.com", "joe", "secret")
3578
config.credential_store_registry.register("stub", store, fallback=True)
3579
conf = config.AuthenticationConfig(_file=StringIO())
3580
creds = conf.get_credentials("http", "example.com")
3581
self.assertEquals("joe", creds["user"])
3582
self.assertEquals("secret", creds["password"])
3585
class StubCredentialStore(config.CredentialStore):
3591
def add_credentials(self, scheme, host, user, password=None):
3592
self._username[(scheme, host)] = user
3593
self._password[(scheme, host)] = password
3595
def get_credentials(self, scheme, host, port=None, user=None,
3596
path=None, realm=None):
3597
key = (scheme, host)
3598
if not key in self._username:
3600
return { "scheme": scheme, "host": host, "port": port,
3601
"user": self._username[key], "password": self._password[key]}
3604
class CountingCredentialStore(config.CredentialStore):
3609
def get_credentials(self, scheme, host, port=None, user=None,
3610
path=None, realm=None):
3615
class TestCredentialStoreRegistry(tests.TestCase):
3617
def _get_cs_registry(self):
3618
return config.credential_store_registry
3620
def test_default_credential_store(self):
3621
r = self._get_cs_registry()
3622
default = r.get_credential_store(None)
3623
self.assertIsInstance(default, config.PlainTextCredentialStore)
3625
def test_unknown_credential_store(self):
3626
r = self._get_cs_registry()
3627
# It's hard to imagine someone creating a credential store named
3628
# 'unknown' so we use that as an never registered key.
3629
self.assertRaises(KeyError, r.get_credential_store, 'unknown')
3631
def test_fallback_none_registered(self):
3632
r = config.CredentialStoreRegistry()
3633
self.assertEquals(None,
3634
r.get_fallback_credentials("http", "example.com"))
3636
def test_register(self):
3637
r = config.CredentialStoreRegistry()
3638
r.register("stub", StubCredentialStore(), fallback=False)
3639
r.register("another", StubCredentialStore(), fallback=True)
3640
self.assertEquals(["another", "stub"], r.keys())
3642
def test_register_lazy(self):
3643
r = config.CredentialStoreRegistry()
3644
r.register_lazy("stub", "bzrlib.tests.test_config",
3645
"StubCredentialStore", fallback=False)
3646
self.assertEquals(["stub"], r.keys())
3647
self.assertIsInstance(r.get_credential_store("stub"),
3648
StubCredentialStore)
3650
def test_is_fallback(self):
3651
r = config.CredentialStoreRegistry()
3652
r.register("stub1", None, fallback=False)
3653
r.register("stub2", None, fallback=True)
3654
self.assertEquals(False, r.is_fallback("stub1"))
3655
self.assertEquals(True, r.is_fallback("stub2"))
3657
def test_no_fallback(self):
3658
r = config.CredentialStoreRegistry()
3659
store = CountingCredentialStore()
3660
r.register("count", store, fallback=False)
3661
self.assertEquals(None,
3662
r.get_fallback_credentials("http", "example.com"))
3663
self.assertEquals(0, store._calls)
3665
def test_fallback_credentials(self):
3666
r = config.CredentialStoreRegistry()
3667
store = StubCredentialStore()
3668
store.add_credentials("http", "example.com",
3669
"somebody", "geheim")
3670
r.register("stub", store, fallback=True)
3671
creds = r.get_fallback_credentials("http", "example.com")
3672
self.assertEquals("somebody", creds["user"])
3673
self.assertEquals("geheim", creds["password"])
3675
def test_fallback_first_wins(self):
3676
r = config.CredentialStoreRegistry()
3677
stub1 = StubCredentialStore()
3678
stub1.add_credentials("http", "example.com",
3679
"somebody", "stub1")
3680
r.register("stub1", stub1, fallback=True)
3681
stub2 = StubCredentialStore()
3682
stub2.add_credentials("http", "example.com",
3683
"somebody", "stub2")
3684
r.register("stub2", stub1, fallback=True)
3685
creds = r.get_fallback_credentials("http", "example.com")
3686
self.assertEquals("somebody", creds["user"])
3687
self.assertEquals("stub1", creds["password"])
3690
class TestPlainTextCredentialStore(tests.TestCase):
3692
def test_decode_password(self):
3693
r = config.credential_store_registry
3694
plain_text = r.get_credential_store()
3695
decoded = plain_text.decode_password(dict(password='secret'))
3696
self.assertEquals('secret', decoded)
3699
# FIXME: Once we have a way to declare authentication to all test servers, we
3700
# can implement generic tests.
3701
# test_user_password_in_url
3702
# test_user_in_url_password_from_config
3703
# test_user_in_url_password_prompted
3704
# test_user_in_config
3705
# test_user_getpass.getuser
3706
# test_user_prompted ?
3707
class TestAuthenticationRing(tests.TestCaseWithTransport):
3711
class TestAutoUserId(tests.TestCase):
3712
"""Test inferring an automatic user name."""
3714
def test_auto_user_id(self):
3715
"""Automatic inference of user name.
3717
This is a bit hard to test in an isolated way, because it depends on
3718
system functions that go direct to /etc or perhaps somewhere else.
3719
But it's reasonable to say that on Unix, with an /etc/mailname, we ought
3720
to be able to choose a user name with no configuration.
3722
if sys.platform == 'win32':
3723
raise tests.TestSkipped(
3724
"User name inference not implemented on win32")
3725
realname, address = config._auto_user_id()
3726
if os.path.exists('/etc/mailname'):
3727
self.assertIsNot(None, realname)
3728
self.assertIsNot(None, address)
3730
self.assertEquals((None, None), (realname, address))