~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_config.py

  • Committer: John Arbash Meinel
  • Date: 2011-01-12 21:27:00 UTC
  • mto: This revision was merged to the branch mainline in revision 5605.
  • Revision ID: john@arbash-meinel.com-20110112212700-esqmtrmevddxrsq2
Clean up the test slightly, hoping to avoid race conditions, update NEWS

Show diffs side-by-side

added added

removed removed

Lines of Context:
33
33
    errors,
34
34
    osutils,
35
35
    mail_client,
36
 
    mergetools,
37
36
    ui,
38
37
    urlutils,
39
 
    registry,
40
 
    remote,
41
38
    tests,
42
39
    trace,
43
40
    transport,
44
41
    )
45
 
from bzrlib.symbol_versioning import (
46
 
    deprecated_in,
47
 
    deprecated_method,
48
 
    )
49
 
from bzrlib.transport import remote as transport_remote
50
42
from bzrlib.tests import (
51
43
    features,
52
44
    scenarios,
53
 
    test_server,
54
45
    )
55
46
from bzrlib.util.configobj import configobj
56
47
 
69
60
 
70
61
load_tests = scenarios.load_tests_apply_scenarios
71
62
 
72
 
# Register helpers to build stores
73
 
config.test_store_builder_registry.register(
74
 
    'configobj', lambda test: config.IniFileStore(test.get_transport(),
75
 
                                                  'configobj.conf'))
76
 
config.test_store_builder_registry.register(
77
 
    'bazaar', lambda test: config.GlobalStore())
78
 
config.test_store_builder_registry.register(
79
 
    'location', lambda test: config.LocationStore())
80
 
 
81
 
 
82
 
def build_backing_branch(test, relpath,
83
 
                         transport_class=None, server_class=None):
84
 
    """Test helper to create a backing branch only once.
85
 
 
86
 
    Some tests needs multiple stores/stacks to check concurrent update
87
 
    behaviours. As such, they need to build different branch *objects* even if
88
 
    they share the branch on disk.
89
 
 
90
 
    :param relpath: The relative path to the branch. (Note that the helper
91
 
        should always specify the same relpath).
92
 
 
93
 
    :param transport_class: The Transport class the test needs to use.
94
 
 
95
 
    :param server_class: The server associated with the ``transport_class``
96
 
        above.
97
 
 
98
 
    Either both or neither of ``transport_class`` and ``server_class`` should
99
 
    be specified.
100
 
    """
101
 
    if transport_class is not None and server_class is not None:
102
 
        test.transport_class = transport_class
103
 
        test.transport_server = server_class
104
 
    elif not (transport_class is None and server_class is None):
105
 
        raise AssertionError('Specify both ``transport_class`` and '
106
 
                             '``server_class`` or neither of them')
107
 
    if getattr(test, 'backing_branch', None) is None:
108
 
        # First call, let's build the branch on disk
109
 
        test.backing_branch = test.make_branch(relpath)
110
 
 
111
 
 
112
 
def build_branch_store(test):
113
 
    build_backing_branch(test, 'branch')
114
 
    b = branch.Branch.open('branch')
115
 
    return config.BranchStore(b)
116
 
config.test_store_builder_registry.register('branch', build_branch_store)
117
 
 
118
 
 
119
 
def build_remote_branch_store(test):
120
 
    # There is only one permutation (but we won't be able to handle more with
121
 
    # this design anyway)
122
 
    (transport_class,
123
 
     server_class) = transport_remote.get_test_permutations()[0]
124
 
    build_backing_branch(test, 'branch', transport_class, server_class)
125
 
    b = branch.Branch.open(test.get_url('branch'))
126
 
    return config.BranchStore(b)
127
 
config.test_store_builder_registry.register('remote_branch',
128
 
                                            build_remote_branch_store)
129
 
 
130
 
 
131
 
config.test_stack_builder_registry.register(
132
 
    'bazaar', lambda test: config.GlobalStack())
133
 
config.test_stack_builder_registry.register(
134
 
    'location', lambda test: config.LocationStack('.'))
135
 
 
136
 
 
137
 
def build_branch_stack(test):
138
 
    build_backing_branch(test, 'branch')
139
 
    b = branch.Branch.open('branch')
140
 
    return config.BranchStack(b)
141
 
config.test_stack_builder_registry.register('branch', build_branch_stack)
142
 
 
143
 
 
144
 
def build_remote_branch_stack(test):
145
 
    # There is only one permutation (but we won't be able to handle more with
146
 
    # this design anyway)
147
 
    (transport_class,
148
 
     server_class) = transport_remote.get_test_permutations()[0]
149
 
    build_backing_branch(test, 'branch', transport_class, server_class)
150
 
    b = branch.Branch.open(test.get_url('branch'))
151
 
    return config.BranchStack(b)
152
 
config.test_stack_builder_registry.register('remote_branch',
153
 
                                            build_remote_branch_stack)
154
 
 
155
63
 
156
64
sample_long_alias="log -r-15..-1 --line"
157
65
sample_config_text = u"""
161
69
change_editor=vimdiff -of @new_path @old_path
162
70
gpg_signing_command=gnome-gpg
163
71
log_format=short
164
 
validate_signatures_in_log=true
165
 
acceptable_keys=amy
166
72
user_global_option=something
167
 
bzr.mergetool.sometool=sometool {base} {this} {other} -o {result}
168
 
bzr.mergetool.funkytool=funkytool "arg with spaces" {this_temp}
169
 
bzr.default_mergetool=sometool
170
73
[ALIASES]
171
74
h=help
172
75
ll=""" + sample_long_alias + "\n"
406
309
        """
407
310
        co = config.ConfigObj()
408
311
        co['test'] = 'foo#bar'
409
 
        outfile = StringIO()
410
 
        co.write(outfile=outfile)
411
 
        lines = outfile.getvalue().splitlines()
 
312
        lines = co.write()
412
313
        self.assertEqual(lines, ['test = "foo#bar"'])
413
314
        co2 = config.ConfigObj(lines)
414
315
        self.assertEqual(co2['test'], 'foo#bar')
415
316
 
416
 
    def test_triple_quotes(self):
417
 
        # Bug #710410: if the value string has triple quotes
418
 
        # then ConfigObj versions up to 4.7.2 will quote them wrong
419
 
        # and won't able to read them back
420
 
        triple_quotes_value = '''spam
421
 
""" that's my spam """
422
 
eggs'''
423
 
        co = config.ConfigObj()
424
 
        co['test'] = triple_quotes_value
425
 
        # While writing this test another bug in ConfigObj has been found:
426
 
        # method co.write() without arguments produces list of lines
427
 
        # one option per line, and multiline values are not split
428
 
        # across multiple lines,
429
 
        # and that breaks the parsing these lines back by ConfigObj.
430
 
        # This issue only affects test, but it's better to avoid
431
 
        # `co.write()` construct at all.
432
 
        # [bialix 20110222] bug report sent to ConfigObj's author
433
 
        outfile = StringIO()
434
 
        co.write(outfile=outfile)
435
 
        output = outfile.getvalue()
436
 
        # now we're trying to read it back
437
 
        co2 = config.ConfigObj(StringIO(output))
438
 
        self.assertEquals(triple_quotes_value, co2['test'])
439
 
 
440
317
 
441
318
erroneous_config = """[section] # line 1
442
319
good=good # line 2
463
340
        config.Config()
464
341
 
465
342
    def test_no_default_editor(self):
466
 
        self.assertRaises(
467
 
            NotImplementedError,
468
 
            self.applyDeprecated, deprecated_in((2, 4, 0)),
469
 
            config.Config().get_editor)
 
343
        self.assertRaises(NotImplementedError, config.Config().get_editor)
470
344
 
471
345
    def test_user_email(self):
472
346
        my_config = InstrumentedConfig()
515
389
        my_config = config.Config()
516
390
        self.assertEqual('long', my_config.log_format())
517
391
 
518
 
    def test_acceptable_keys_default(self):
519
 
        my_config = config.Config()
520
 
        self.assertEqual(None, my_config.acceptable_keys())
521
 
 
522
 
    def test_validate_signatures_in_log_default(self):
523
 
        my_config = config.Config()
524
 
        self.assertEqual(False, my_config.validate_signatures_in_log())
525
 
 
526
392
    def test_get_change_editor(self):
527
393
        my_config = InstrumentedConfig()
528
394
        change_editor = my_config.get_change_editor('old_tree', 'new_tree')
613
479
    def test_cached(self):
614
480
        my_config = config.IniBasedConfig.from_string(sample_config_text)
615
481
        parser = my_config._get_parser()
616
 
        self.assertTrue(my_config._get_parser() is parser)
 
482
        self.failUnless(my_config._get_parser() is parser)
617
483
 
618
484
    def _dummy_chown(self, path, uid, gid):
619
485
        self.path, self.uid, self.gid = path, uid, gid
644
510
            ' Use IniBasedConfig(_content=xxx) instead.'],
645
511
            conf._get_parser, file=config_file)
646
512
 
647
 
 
648
513
class TestIniConfigSaving(tests.TestCaseInTempDir):
649
514
 
650
515
    def test_cant_save_without_a_file_name(self):
658
523
        self.assertFileEqual(content, 'test.conf')
659
524
 
660
525
 
661
 
class TestIniConfigOptionExpansionDefaultValue(tests.TestCaseInTempDir):
662
 
    """What is the default value of expand for config options.
663
 
 
664
 
    This is an opt-in beta feature used to evaluate whether or not option
665
 
    references can appear in dangerous place raising exceptions, disapearing
666
 
    (and as such corrupting data) or if it's safe to activate the option by
667
 
    default.
668
 
 
669
 
    Note that these tests relies on config._expand_default_value being already
670
 
    overwritten in the parent class setUp.
671
 
    """
672
 
 
673
 
    def setUp(self):
674
 
        super(TestIniConfigOptionExpansionDefaultValue, self).setUp()
675
 
        self.config = None
676
 
        self.warnings = []
677
 
        def warning(*args):
678
 
            self.warnings.append(args[0] % args[1:])
679
 
        self.overrideAttr(trace, 'warning', warning)
680
 
 
681
 
    def get_config(self, expand):
682
 
        c = config.GlobalConfig.from_string('bzr.config.expand=%s' % (expand,),
683
 
                                            save=True)
684
 
        return c
685
 
 
686
 
    def assertExpandIs(self, expected):
687
 
        actual = config._get_expand_default_value()
688
 
        #self.config.get_user_option_as_bool('bzr.config.expand')
689
 
        self.assertEquals(expected, actual)
690
 
 
691
 
    def test_default_is_None(self):
692
 
        self.assertEquals(None, config._expand_default_value)
693
 
 
694
 
    def test_default_is_False_even_if_None(self):
695
 
        self.config = self.get_config(None)
696
 
        self.assertExpandIs(False)
697
 
 
698
 
    def test_default_is_False_even_if_invalid(self):
699
 
        self.config = self.get_config('<your choice>')
700
 
        self.assertExpandIs(False)
701
 
        # ...
702
 
        # Huh ? My choice is False ? Thanks, always happy to hear that :D
703
 
        # Wait, you've been warned !
704
 
        self.assertLength(1, self.warnings)
705
 
        self.assertEquals(
706
 
            'Value "<your choice>" is not a boolean for "bzr.config.expand"',
707
 
            self.warnings[0])
708
 
 
709
 
    def test_default_is_True(self):
710
 
        self.config = self.get_config(True)
711
 
        self.assertExpandIs(True)
712
 
 
713
 
    def test_default_is_False(self):
714
 
        self.config = self.get_config(False)
715
 
        self.assertExpandIs(False)
716
 
 
717
 
 
718
 
class TestIniConfigOptionExpansion(tests.TestCase):
719
 
    """Test option expansion from the IniConfig level.
720
 
 
721
 
    What we really want here is to test the Config level, but the class being
722
 
    abstract as far as storing values is concerned, this can't be done
723
 
    properly (yet).
724
 
    """
725
 
    # FIXME: This should be rewritten when all configs share a storage
726
 
    # implementation -- vila 2011-02-18
727
 
 
728
 
    def get_config(self, string=None):
729
 
        if string is None:
730
 
            string = ''
731
 
        c = config.IniBasedConfig.from_string(string)
732
 
        return c
733
 
 
734
 
    def assertExpansion(self, expected, conf, string, env=None):
735
 
        self.assertEquals(expected, conf.expand_options(string, env))
736
 
 
737
 
    def test_no_expansion(self):
738
 
        c = self.get_config('')
739
 
        self.assertExpansion('foo', c, 'foo')
740
 
 
741
 
    def test_env_adding_options(self):
742
 
        c = self.get_config('')
743
 
        self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
744
 
 
745
 
    def test_env_overriding_options(self):
746
 
        c = self.get_config('foo=baz')
747
 
        self.assertExpansion('bar', c, '{foo}', {'foo': 'bar'})
748
 
 
749
 
    def test_simple_ref(self):
750
 
        c = self.get_config('foo=xxx')
751
 
        self.assertExpansion('xxx', c, '{foo}')
752
 
 
753
 
    def test_unknown_ref(self):
754
 
        c = self.get_config('')
755
 
        self.assertRaises(errors.ExpandingUnknownOption,
756
 
                          c.expand_options, '{foo}')
757
 
 
758
 
    def test_indirect_ref(self):
759
 
        c = self.get_config('''
760
 
foo=xxx
761
 
bar={foo}
762
 
''')
763
 
        self.assertExpansion('xxx', c, '{bar}')
764
 
 
765
 
    def test_embedded_ref(self):
766
 
        c = self.get_config('''
767
 
foo=xxx
768
 
bar=foo
769
 
''')
770
 
        self.assertExpansion('xxx', c, '{{bar}}')
771
 
 
772
 
    def test_simple_loop(self):
773
 
        c = self.get_config('foo={foo}')
774
 
        self.assertRaises(errors.OptionExpansionLoop, c.expand_options, '{foo}')
775
 
 
776
 
    def test_indirect_loop(self):
777
 
        c = self.get_config('''
778
 
foo={bar}
779
 
bar={baz}
780
 
baz={foo}''')
781
 
        e = self.assertRaises(errors.OptionExpansionLoop,
782
 
                              c.expand_options, '{foo}')
783
 
        self.assertEquals('foo->bar->baz', e.refs)
784
 
        self.assertEquals('{foo}', e.string)
785
 
 
786
 
    def test_list(self):
787
 
        conf = self.get_config('''
788
 
foo=start
789
 
bar=middle
790
 
baz=end
791
 
list={foo},{bar},{baz}
792
 
''')
793
 
        self.assertEquals(['start', 'middle', 'end'],
794
 
                           conf.get_user_option('list', expand=True))
795
 
 
796
 
    def test_cascading_list(self):
797
 
        conf = self.get_config('''
798
 
foo=start,{bar}
799
 
bar=middle,{baz}
800
 
baz=end
801
 
list={foo}
802
 
''')
803
 
        self.assertEquals(['start', 'middle', 'end'],
804
 
                           conf.get_user_option('list', expand=True))
805
 
 
806
 
    def test_pathological_hidden_list(self):
807
 
        conf = self.get_config('''
808
 
foo=bin
809
 
bar=go
810
 
start={foo
811
 
middle=},{
812
 
end=bar}
813
 
hidden={start}{middle}{end}
814
 
''')
815
 
        # Nope, it's either a string or a list, and the list wins as soon as a
816
 
        # ',' appears, so the string concatenation never occur.
817
 
        self.assertEquals(['{foo', '}', '{', 'bar}'],
818
 
                          conf.get_user_option('hidden', expand=True))
819
 
 
820
 
class TestLocationConfigOptionExpansion(tests.TestCaseInTempDir):
821
 
 
822
 
    def get_config(self, location, string=None):
823
 
        if string is None:
824
 
            string = ''
825
 
        # Since we don't save the config we won't strictly require to inherit
826
 
        # from TestCaseInTempDir, but an error occurs so quickly...
827
 
        c = config.LocationConfig.from_string(string, location)
828
 
        return c
829
 
 
830
 
    def test_dont_cross_unrelated_section(self):
831
 
        c = self.get_config('/another/branch/path','''
832
 
[/one/branch/path]
833
 
foo = hello
834
 
bar = {foo}/2
835
 
 
836
 
[/another/branch/path]
837
 
bar = {foo}/2
838
 
''')
839
 
        self.assertRaises(errors.ExpandingUnknownOption,
840
 
                          c.get_user_option, 'bar', expand=True)
841
 
 
842
 
    def test_cross_related_sections(self):
843
 
        c = self.get_config('/project/branch/path','''
844
 
[/project]
845
 
foo = qu
846
 
 
847
 
[/project/branch/path]
848
 
bar = {foo}ux
849
 
''')
850
 
        self.assertEquals('quux', c.get_user_option('bar', expand=True))
851
 
 
852
 
 
853
526
class TestIniBaseConfigOnDisk(tests.TestCaseInTempDir):
854
527
 
855
528
    def test_cannot_reload_without_name(self):
935
608
        def c1_write_config_file():
936
609
            before_writing.set()
937
610
            c1_orig()
938
 
            # The lock is held. We wait for the main thread to decide when to
 
611
            # The lock is held we wait for the main thread to decide when to
939
612
            # continue
940
613
            after_writing.wait()
941
614
        c1._write_config_file = c1_write_config_file
968
641
       c1_orig = c1._write_config_file
969
642
       def c1_write_config_file():
970
643
           ready_to_write.set()
971
 
           # The lock is held. We wait for the main thread to decide when to
 
644
           # The lock is held we wait for the main thread to decide when to
972
645
           # continue
973
646
           do_writing.wait()
974
647
           c1_orig()
1065
738
            parser = my_config._get_parser()
1066
739
        finally:
1067
740
            config.ConfigObj = oldparserclass
1068
 
        self.assertIsInstance(parser, InstrumentedConfigObj)
 
741
        self.failUnless(isinstance(parser, InstrumentedConfigObj))
1069
742
        self.assertEqual(parser._calls, [('__init__', config.config_filename(),
1070
743
                                          'utf-8')])
1071
744
 
1082
755
        my_config = config.BranchConfig(branch)
1083
756
        location_config = my_config._get_location_config()
1084
757
        self.assertEqual(branch.base, location_config.location)
1085
 
        self.assertIs(location_config, my_config._get_location_config())
 
758
        self.failUnless(location_config is my_config._get_location_config())
1086
759
 
1087
760
    def test_get_config(self):
1088
761
        """The Branch.get_config method works properly"""
1188
861
 
1189
862
    def test_configured_editor(self):
1190
863
        my_config = config.GlobalConfig.from_string(sample_config_text)
1191
 
        editor = self.applyDeprecated(
1192
 
            deprecated_in((2, 4, 0)), my_config.get_editor)
1193
 
        self.assertEqual('vim', editor)
 
864
        self.assertEqual("vim", my_config.get_editor())
1194
865
 
1195
866
    def test_signatures_always(self):
1196
867
        my_config = config.GlobalConfig.from_string(sample_always_signatures)
1250
921
        my_config = self._get_sample_config()
1251
922
        self.assertEqual("short", my_config.log_format())
1252
923
 
1253
 
    def test_configured_acceptable_keys(self):
1254
 
        my_config = self._get_sample_config()
1255
 
        self.assertEqual("amy", my_config.acceptable_keys())
1256
 
 
1257
 
    def test_configured_validate_signatures_in_log(self):
1258
 
        my_config = self._get_sample_config()
1259
 
        self.assertEqual(True, my_config.validate_signatures_in_log())
1260
 
 
1261
924
    def test_get_alias(self):
1262
925
        my_config = self._get_sample_config()
1263
926
        self.assertEqual('help', my_config.get_alias('h'))
1290
953
        change_editor = my_config.get_change_editor('old', 'new')
1291
954
        self.assertIs(None, change_editor)
1292
955
 
1293
 
    def test_get_merge_tools(self):
1294
 
        conf = self._get_sample_config()
1295
 
        tools = conf.get_merge_tools()
1296
 
        self.log(repr(tools))
1297
 
        self.assertEqual(
1298
 
            {u'funkytool' : u'funkytool "arg with spaces" {this_temp}',
1299
 
            u'sometool' : u'sometool {base} {this} {other} -o {result}'},
1300
 
            tools)
1301
 
 
1302
 
    def test_get_merge_tools_empty(self):
1303
 
        conf = self._get_empty_config()
1304
 
        tools = conf.get_merge_tools()
1305
 
        self.assertEqual({}, tools)
1306
 
 
1307
 
    def test_find_merge_tool(self):
1308
 
        conf = self._get_sample_config()
1309
 
        cmdline = conf.find_merge_tool('sometool')
1310
 
        self.assertEqual('sometool {base} {this} {other} -o {result}', cmdline)
1311
 
 
1312
 
    def test_find_merge_tool_not_found(self):
1313
 
        conf = self._get_sample_config()
1314
 
        cmdline = conf.find_merge_tool('DOES NOT EXIST')
1315
 
        self.assertIs(cmdline, None)
1316
 
 
1317
 
    def test_find_merge_tool_known(self):
1318
 
        conf = self._get_empty_config()
1319
 
        cmdline = conf.find_merge_tool('kdiff3')
1320
 
        self.assertEquals('kdiff3 {base} {this} {other} -o {result}', cmdline)
1321
 
 
1322
 
    def test_find_merge_tool_override_known(self):
1323
 
        conf = self._get_empty_config()
1324
 
        conf.set_user_option('bzr.mergetool.kdiff3', 'kdiff3 blah')
1325
 
        cmdline = conf.find_merge_tool('kdiff3')
1326
 
        self.assertEqual('kdiff3 blah', cmdline)
1327
 
 
1328
956
 
1329
957
class TestGlobalConfigSavingOptions(tests.TestCaseInTempDir):
1330
958
 
1366
994
            parser = my_config._get_parser()
1367
995
        finally:
1368
996
            config.ConfigObj = oldparserclass
1369
 
        self.assertIsInstance(parser, InstrumentedConfigObj)
 
997
        self.failUnless(isinstance(parser, InstrumentedConfigObj))
1370
998
        self.assertEqual(parser._calls,
1371
999
                         [('__init__', config.locations_config_filename(),
1372
1000
                           'utf-8')])
1374
1002
    def test_get_global_config(self):
1375
1003
        my_config = config.BranchConfig(FakeBranch('http://example.com'))
1376
1004
        global_config = my_config._get_global_config()
1377
 
        self.assertIsInstance(global_config, config.GlobalConfig)
1378
 
        self.assertIs(global_config, my_config._get_global_config())
1379
 
 
1380
 
    def assertLocationMatching(self, expected):
1381
 
        self.assertEqual(expected,
1382
 
                         list(self.my_location_config._get_matching_sections()))
 
1005
        self.failUnless(isinstance(global_config, config.GlobalConfig))
 
1006
        self.failUnless(global_config is my_config._get_global_config())
1383
1007
 
1384
1008
    def test__get_matching_sections_no_match(self):
1385
1009
        self.get_branch_config('/')
1386
 
        self.assertLocationMatching([])
 
1010
        self.assertEqual([], self.my_location_config._get_matching_sections())
1387
1011
 
1388
1012
    def test__get_matching_sections_exact(self):
1389
1013
        self.get_branch_config('http://www.example.com')
1390
 
        self.assertLocationMatching([('http://www.example.com', '')])
 
1014
        self.assertEqual([('http://www.example.com', '')],
 
1015
                         self.my_location_config._get_matching_sections())
1391
1016
 
1392
1017
    def test__get_matching_sections_suffix_does_not(self):
1393
1018
        self.get_branch_config('http://www.example.com-com')
1394
 
        self.assertLocationMatching([])
 
1019
        self.assertEqual([], self.my_location_config._get_matching_sections())
1395
1020
 
1396
1021
    def test__get_matching_sections_subdir_recursive(self):
1397
1022
        self.get_branch_config('http://www.example.com/com')
1398
 
        self.assertLocationMatching([('http://www.example.com', 'com')])
 
1023
        self.assertEqual([('http://www.example.com', 'com')],
 
1024
                         self.my_location_config._get_matching_sections())
1399
1025
 
1400
1026
    def test__get_matching_sections_ignoreparent(self):
1401
1027
        self.get_branch_config('http://www.example.com/ignoreparent')
1402
 
        self.assertLocationMatching([('http://www.example.com/ignoreparent',
1403
 
                                      '')])
 
1028
        self.assertEqual([('http://www.example.com/ignoreparent', '')],
 
1029
                         self.my_location_config._get_matching_sections())
1404
1030
 
1405
1031
    def test__get_matching_sections_ignoreparent_subdir(self):
1406
1032
        self.get_branch_config(
1407
1033
            'http://www.example.com/ignoreparent/childbranch')
1408
 
        self.assertLocationMatching([('http://www.example.com/ignoreparent',
1409
 
                                      'childbranch')])
 
1034
        self.assertEqual([('http://www.example.com/ignoreparent',
 
1035
                           'childbranch')],
 
1036
                         self.my_location_config._get_matching_sections())
1410
1037
 
1411
1038
    def test__get_matching_sections_subdir_trailing_slash(self):
1412
1039
        self.get_branch_config('/b')
1413
 
        self.assertLocationMatching([('/b/', '')])
 
1040
        self.assertEqual([('/b/', '')],
 
1041
                         self.my_location_config._get_matching_sections())
1414
1042
 
1415
1043
    def test__get_matching_sections_subdir_child(self):
1416
1044
        self.get_branch_config('/a/foo')
1417
 
        self.assertLocationMatching([('/a/*', ''), ('/a/', 'foo')])
 
1045
        self.assertEqual([('/a/*', ''), ('/a/', 'foo')],
 
1046
                         self.my_location_config._get_matching_sections())
1418
1047
 
1419
1048
    def test__get_matching_sections_subdir_child_child(self):
1420
1049
        self.get_branch_config('/a/foo/bar')
1421
 
        self.assertLocationMatching([('/a/*', 'bar'), ('/a/', 'foo/bar')])
 
1050
        self.assertEqual([('/a/*', 'bar'), ('/a/', 'foo/bar')],
 
1051
                         self.my_location_config._get_matching_sections())
1422
1052
 
1423
1053
    def test__get_matching_sections_trailing_slash_with_children(self):
1424
1054
        self.get_branch_config('/a/')
1425
 
        self.assertLocationMatching([('/a/', '')])
 
1055
        self.assertEqual([('/a/', '')],
 
1056
                         self.my_location_config._get_matching_sections())
1426
1057
 
1427
1058
    def test__get_matching_sections_explicit_over_glob(self):
1428
1059
        # XXX: 2006-09-08 jamesh
1430
1061
        # was a config section for '/a/?', it would get precedence
1431
1062
        # over '/a/c'.
1432
1063
        self.get_branch_config('/a/c')
1433
 
        self.assertLocationMatching([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')])
 
1064
        self.assertEqual([('/a/c', ''), ('/a/*', ''), ('/a/', 'c')],
 
1065
                         self.my_location_config._get_matching_sections())
1434
1066
 
1435
1067
    def test__get_option_policy_normal(self):
1436
1068
        self.get_branch_config('http://www.example.com')
1889
1521
 
1890
1522
class TestTransportConfig(tests.TestCaseWithTransport):
1891
1523
 
1892
 
    def test_load_utf8(self):
1893
 
        """Ensure we can load an utf8-encoded file."""
1894
 
        t = self.get_transport()
1895
 
        unicode_user = u'b\N{Euro Sign}ar'
1896
 
        unicode_content = u'user=%s' % (unicode_user,)
1897
 
        utf8_content = unicode_content.encode('utf8')
1898
 
        # Store the raw content in the config file
1899
 
        t.put_bytes('foo.conf', utf8_content)
1900
 
        conf = config.TransportConfig(t, 'foo.conf')
1901
 
        self.assertEquals(unicode_user, conf.get_option('user'))
1902
 
 
1903
 
    def test_load_non_ascii(self):
1904
 
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
1905
 
        t = self.get_transport()
1906
 
        t.put_bytes('foo.conf', 'user=foo\n#\xff\n')
1907
 
        conf = config.TransportConfig(t, 'foo.conf')
1908
 
        self.assertRaises(errors.ConfigContentError, conf._get_configobj)
1909
 
 
1910
 
    def test_load_erroneous_content(self):
1911
 
        """Ensure we display a proper error on content that can't be parsed."""
1912
 
        t = self.get_transport()
1913
 
        t.put_bytes('foo.conf', '[open_section\n')
1914
 
        conf = config.TransportConfig(t, 'foo.conf')
1915
 
        self.assertRaises(errors.ParseConfigError, conf._get_configobj)
1916
 
 
1917
1524
    def test_get_value(self):
1918
1525
        """Test that retreiving a value from a section is possible"""
1919
 
        bzrdir_config = config.TransportConfig(self.get_transport('.'),
 
1526
        bzrdir_config = config.TransportConfig(transport.get_transport('.'),
1920
1527
                                               'control.conf')
1921
1528
        bzrdir_config.set_option('value', 'key', 'SECTION')
1922
1529
        bzrdir_config.set_option('value2', 'key2')
1952
1559
        self.assertIs(None, bzrdir_config.get_default_stack_on())
1953
1560
 
1954
1561
 
1955
 
class TestOldConfigHooks(tests.TestCaseWithTransport):
1956
 
 
1957
 
    def setUp(self):
1958
 
        super(TestOldConfigHooks, self).setUp()
1959
 
        create_configs_with_file_option(self)
1960
 
 
1961
 
    def assertGetHook(self, conf, name, value):
1962
 
        calls = []
1963
 
        def hook(*args):
1964
 
            calls.append(args)
1965
 
        config.OldConfigHooks.install_named_hook('get', hook, None)
1966
 
        self.addCleanup(
1967
 
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
1968
 
        self.assertLength(0, calls)
1969
 
        actual_value = conf.get_user_option(name)
1970
 
        self.assertEquals(value, actual_value)
1971
 
        self.assertLength(1, calls)
1972
 
        self.assertEquals((conf, name, value), calls[0])
1973
 
 
1974
 
    def test_get_hook_bazaar(self):
1975
 
        self.assertGetHook(self.bazaar_config, 'file', 'bazaar')
1976
 
 
1977
 
    def test_get_hook_locations(self):
1978
 
        self.assertGetHook(self.locations_config, 'file', 'locations')
1979
 
 
1980
 
    def test_get_hook_branch(self):
1981
 
        # Since locations masks branch, we define a different option
1982
 
        self.branch_config.set_user_option('file2', 'branch')
1983
 
        self.assertGetHook(self.branch_config, 'file2', 'branch')
1984
 
 
1985
 
    def assertSetHook(self, conf, name, value):
1986
 
        calls = []
1987
 
        def hook(*args):
1988
 
            calls.append(args)
1989
 
        config.OldConfigHooks.install_named_hook('set', hook, None)
1990
 
        self.addCleanup(
1991
 
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
1992
 
        self.assertLength(0, calls)
1993
 
        conf.set_user_option(name, value)
1994
 
        self.assertLength(1, calls)
1995
 
        # We can't assert the conf object below as different configs use
1996
 
        # different means to implement set_user_option and we care only about
1997
 
        # coverage here.
1998
 
        self.assertEquals((name, value), calls[0][1:])
1999
 
 
2000
 
    def test_set_hook_bazaar(self):
2001
 
        self.assertSetHook(self.bazaar_config, 'foo', 'bazaar')
2002
 
 
2003
 
    def test_set_hook_locations(self):
2004
 
        self.assertSetHook(self.locations_config, 'foo', 'locations')
2005
 
 
2006
 
    def test_set_hook_branch(self):
2007
 
        self.assertSetHook(self.branch_config, 'foo', 'branch')
2008
 
 
2009
 
    def assertRemoveHook(self, conf, name, section_name=None):
2010
 
        calls = []
2011
 
        def hook(*args):
2012
 
            calls.append(args)
2013
 
        config.OldConfigHooks.install_named_hook('remove', hook, None)
2014
 
        self.addCleanup(
2015
 
            config.OldConfigHooks.uninstall_named_hook, 'remove', None)
2016
 
        self.assertLength(0, calls)
2017
 
        conf.remove_user_option(name, section_name)
2018
 
        self.assertLength(1, calls)
2019
 
        # We can't assert the conf object below as different configs use
2020
 
        # different means to implement remove_user_option and we care only about
2021
 
        # coverage here.
2022
 
        self.assertEquals((name,), calls[0][1:])
2023
 
 
2024
 
    def test_remove_hook_bazaar(self):
2025
 
        self.assertRemoveHook(self.bazaar_config, 'file')
2026
 
 
2027
 
    def test_remove_hook_locations(self):
2028
 
        self.assertRemoveHook(self.locations_config, 'file',
2029
 
                              self.locations_config.location)
2030
 
 
2031
 
    def test_remove_hook_branch(self):
2032
 
        self.assertRemoveHook(self.branch_config, 'file')
2033
 
 
2034
 
    def assertLoadHook(self, name, conf_class, *conf_args):
2035
 
        calls = []
2036
 
        def hook(*args):
2037
 
            calls.append(args)
2038
 
        config.OldConfigHooks.install_named_hook('load', hook, None)
2039
 
        self.addCleanup(
2040
 
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
2041
 
        self.assertLength(0, calls)
2042
 
        # Build a config
2043
 
        conf = conf_class(*conf_args)
2044
 
        # Access an option to trigger a load
2045
 
        conf.get_user_option(name)
2046
 
        self.assertLength(1, calls)
2047
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2048
 
 
2049
 
    def test_load_hook_bazaar(self):
2050
 
        self.assertLoadHook('file', config.GlobalConfig)
2051
 
 
2052
 
    def test_load_hook_locations(self):
2053
 
        self.assertLoadHook('file', config.LocationConfig, self.tree.basedir)
2054
 
 
2055
 
    def test_load_hook_branch(self):
2056
 
        self.assertLoadHook('file', config.BranchConfig, self.tree.branch)
2057
 
 
2058
 
    def assertSaveHook(self, conf):
2059
 
        calls = []
2060
 
        def hook(*args):
2061
 
            calls.append(args)
2062
 
        config.OldConfigHooks.install_named_hook('save', hook, None)
2063
 
        self.addCleanup(
2064
 
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
2065
 
        self.assertLength(0, calls)
2066
 
        # Setting an option triggers a save
2067
 
        conf.set_user_option('foo', 'bar')
2068
 
        self.assertLength(1, calls)
2069
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2070
 
 
2071
 
    def test_save_hook_bazaar(self):
2072
 
        self.assertSaveHook(self.bazaar_config)
2073
 
 
2074
 
    def test_save_hook_locations(self):
2075
 
        self.assertSaveHook(self.locations_config)
2076
 
 
2077
 
    def test_save_hook_branch(self):
2078
 
        self.assertSaveHook(self.branch_config)
2079
 
 
2080
 
 
2081
 
class TestOldConfigHooksForRemote(tests.TestCaseWithTransport):
2082
 
    """Tests config hooks for remote configs.
2083
 
 
2084
 
    No tests for the remove hook as this is not implemented there.
2085
 
    """
2086
 
 
2087
 
    def setUp(self):
2088
 
        super(TestOldConfigHooksForRemote, self).setUp()
2089
 
        self.transport_server = test_server.SmartTCPServer_for_testing
2090
 
        create_configs_with_file_option(self)
2091
 
 
2092
 
    def assertGetHook(self, conf, name, value):
2093
 
        calls = []
2094
 
        def hook(*args):
2095
 
            calls.append(args)
2096
 
        config.OldConfigHooks.install_named_hook('get', hook, None)
2097
 
        self.addCleanup(
2098
 
            config.OldConfigHooks.uninstall_named_hook, 'get', None)
2099
 
        self.assertLength(0, calls)
2100
 
        actual_value = conf.get_option(name)
2101
 
        self.assertEquals(value, actual_value)
2102
 
        self.assertLength(1, calls)
2103
 
        self.assertEquals((conf, name, value), calls[0])
2104
 
 
2105
 
    def test_get_hook_remote_branch(self):
2106
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2107
 
        self.assertGetHook(remote_branch._get_config(), 'file', 'branch')
2108
 
 
2109
 
    def test_get_hook_remote_bzrdir(self):
2110
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2111
 
        conf = remote_bzrdir._get_config()
2112
 
        conf.set_option('remotedir', 'file')
2113
 
        self.assertGetHook(conf, 'file', 'remotedir')
2114
 
 
2115
 
    def assertSetHook(self, conf, name, value):
2116
 
        calls = []
2117
 
        def hook(*args):
2118
 
            calls.append(args)
2119
 
        config.OldConfigHooks.install_named_hook('set', hook, None)
2120
 
        self.addCleanup(
2121
 
            config.OldConfigHooks.uninstall_named_hook, 'set', None)
2122
 
        self.assertLength(0, calls)
2123
 
        conf.set_option(value, name)
2124
 
        self.assertLength(1, calls)
2125
 
        # We can't assert the conf object below as different configs use
2126
 
        # different means to implement set_user_option and we care only about
2127
 
        # coverage here.
2128
 
        self.assertEquals((name, value), calls[0][1:])
2129
 
 
2130
 
    def test_set_hook_remote_branch(self):
2131
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2132
 
        self.addCleanup(remote_branch.lock_write().unlock)
2133
 
        self.assertSetHook(remote_branch._get_config(), 'file', 'remote')
2134
 
 
2135
 
    def test_set_hook_remote_bzrdir(self):
2136
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2137
 
        self.addCleanup(remote_branch.lock_write().unlock)
2138
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2139
 
        self.assertSetHook(remote_bzrdir._get_config(), 'file', 'remotedir')
2140
 
 
2141
 
    def assertLoadHook(self, expected_nb_calls, name, conf_class, *conf_args):
2142
 
        calls = []
2143
 
        def hook(*args):
2144
 
            calls.append(args)
2145
 
        config.OldConfigHooks.install_named_hook('load', hook, None)
2146
 
        self.addCleanup(
2147
 
            config.OldConfigHooks.uninstall_named_hook, 'load', None)
2148
 
        self.assertLength(0, calls)
2149
 
        # Build a config
2150
 
        conf = conf_class(*conf_args)
2151
 
        # Access an option to trigger a load
2152
 
        conf.get_option(name)
2153
 
        self.assertLength(expected_nb_calls, calls)
2154
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2155
 
 
2156
 
    def test_load_hook_remote_branch(self):
2157
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2158
 
        self.assertLoadHook(1, 'file', remote.RemoteBranchConfig, remote_branch)
2159
 
 
2160
 
    def test_load_hook_remote_bzrdir(self):
2161
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2162
 
        # The config file doesn't exist, set an option to force its creation
2163
 
        conf = remote_bzrdir._get_config()
2164
 
        conf.set_option('remotedir', 'file')
2165
 
        # We get one call for the server and one call for the client, this is
2166
 
        # caused by the differences in implementations betwen
2167
 
        # SmartServerBzrDirRequestConfigFile (in smart/bzrdir.py) and
2168
 
        # SmartServerBranchGetConfigFile (in smart/branch.py)
2169
 
        self.assertLoadHook(2 ,'file', remote.RemoteBzrDirConfig, remote_bzrdir)
2170
 
 
2171
 
    def assertSaveHook(self, conf):
2172
 
        calls = []
2173
 
        def hook(*args):
2174
 
            calls.append(args)
2175
 
        config.OldConfigHooks.install_named_hook('save', hook, None)
2176
 
        self.addCleanup(
2177
 
            config.OldConfigHooks.uninstall_named_hook, 'save', None)
2178
 
        self.assertLength(0, calls)
2179
 
        # Setting an option triggers a save
2180
 
        conf.set_option('foo', 'bar')
2181
 
        self.assertLength(1, calls)
2182
 
        # Since we can't assert about conf, we just use the number of calls ;-/
2183
 
 
2184
 
    def test_save_hook_remote_branch(self):
2185
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2186
 
        self.addCleanup(remote_branch.lock_write().unlock)
2187
 
        self.assertSaveHook(remote_branch._get_config())
2188
 
 
2189
 
    def test_save_hook_remote_bzrdir(self):
2190
 
        remote_branch = branch.Branch.open(self.get_url('tree'))
2191
 
        self.addCleanup(remote_branch.lock_write().unlock)
2192
 
        remote_bzrdir = bzrdir.BzrDir.open(self.get_url('tree'))
2193
 
        self.assertSaveHook(remote_bzrdir._get_config())
2194
 
 
2195
 
 
2196
 
class TestOption(tests.TestCase):
2197
 
 
2198
 
    def test_default_value(self):
2199
 
        opt = config.Option('foo', default='bar')
2200
 
        self.assertEquals('bar', opt.get_default())
2201
 
 
2202
 
 
2203
 
class TestOptionRegistry(tests.TestCase):
2204
 
 
2205
 
    def setUp(self):
2206
 
        super(TestOptionRegistry, self).setUp()
2207
 
        # Always start with an empty registry
2208
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2209
 
        self.registry = config.option_registry
2210
 
 
2211
 
    def test_register(self):
2212
 
        opt = config.Option('foo')
2213
 
        self.registry.register('foo', opt)
2214
 
        self.assertIs(opt, self.registry.get('foo'))
2215
 
 
2216
 
    lazy_option = config.Option('lazy_foo')
2217
 
 
2218
 
    def test_register_lazy(self):
2219
 
        self.registry.register_lazy('foo', self.__module__,
2220
 
                                    'TestOptionRegistry.lazy_option')
2221
 
        self.assertIs(self.lazy_option, self.registry.get('foo'))
2222
 
 
2223
 
    def test_registered_help(self):
2224
 
        opt = config.Option('foo')
2225
 
        self.registry.register('foo', opt, help='A simple option')
2226
 
        self.assertEquals('A simple option', self.registry.get_help('foo'))
2227
 
 
2228
 
 
2229
 
class TestRegisteredOptions(tests.TestCase):
2230
 
    """All registered options should verify some constraints."""
2231
 
 
2232
 
    scenarios = [(key, {'option_name': key, 'option': option}) for key, option
2233
 
                 in config.option_registry.iteritems()]
2234
 
 
2235
 
    def setUp(self):
2236
 
        super(TestRegisteredOptions, self).setUp()
2237
 
        self.registry = config.option_registry
2238
 
 
2239
 
    def test_proper_name(self):
2240
 
        # An option should be registered under its own name, this can't be
2241
 
        # checked at registration time for the lazy ones.
2242
 
        self.assertEquals(self.option_name, self.option.name)
2243
 
 
2244
 
    def test_help_is_set(self):
2245
 
        option_help = self.registry.get_help(self.option_name)
2246
 
        self.assertNotEquals(None, option_help)
2247
 
        # Come on, think about the user, he really wants to know whst the
2248
 
        # option is about
2249
 
        self.assertNotEquals('', option_help)
2250
 
 
2251
 
 
2252
 
class TestSection(tests.TestCase):
2253
 
 
2254
 
    # FIXME: Parametrize so that all sections produced by Stores run these
2255
 
    # tests -- vila 2011-04-01
2256
 
 
2257
 
    def test_get_a_value(self):
2258
 
        a_dict = dict(foo='bar')
2259
 
        section = config.Section('myID', a_dict)
2260
 
        self.assertEquals('bar', section.get('foo'))
2261
 
 
2262
 
    def test_get_unknown_option(self):
2263
 
        a_dict = dict()
2264
 
        section = config.Section(None, a_dict)
2265
 
        self.assertEquals('out of thin air',
2266
 
                          section.get('foo', 'out of thin air'))
2267
 
 
2268
 
    def test_options_is_shared(self):
2269
 
        a_dict = dict()
2270
 
        section = config.Section(None, a_dict)
2271
 
        self.assertIs(a_dict, section.options)
2272
 
 
2273
 
 
2274
 
class TestMutableSection(tests.TestCase):
2275
 
 
2276
 
    # FIXME: Parametrize so that all sections (including os.environ and the
2277
 
    # ones produced by Stores) run these tests -- vila 2011-04-01
2278
 
 
2279
 
    def test_set(self):
2280
 
        a_dict = dict(foo='bar')
2281
 
        section = config.MutableSection('myID', a_dict)
2282
 
        section.set('foo', 'new_value')
2283
 
        self.assertEquals('new_value', section.get('foo'))
2284
 
        # The change appears in the shared section
2285
 
        self.assertEquals('new_value', a_dict.get('foo'))
2286
 
        # We keep track of the change
2287
 
        self.assertTrue('foo' in section.orig)
2288
 
        self.assertEquals('bar', section.orig.get('foo'))
2289
 
 
2290
 
    def test_set_preserve_original_once(self):
2291
 
        a_dict = dict(foo='bar')
2292
 
        section = config.MutableSection('myID', a_dict)
2293
 
        section.set('foo', 'first_value')
2294
 
        section.set('foo', 'second_value')
2295
 
        # We keep track of the original value
2296
 
        self.assertTrue('foo' in section.orig)
2297
 
        self.assertEquals('bar', section.orig.get('foo'))
2298
 
 
2299
 
    def test_remove(self):
2300
 
        a_dict = dict(foo='bar')
2301
 
        section = config.MutableSection('myID', a_dict)
2302
 
        section.remove('foo')
2303
 
        # We get None for unknown options via the default value
2304
 
        self.assertEquals(None, section.get('foo'))
2305
 
        # Or we just get the default value
2306
 
        self.assertEquals('unknown', section.get('foo', 'unknown'))
2307
 
        self.assertFalse('foo' in section.options)
2308
 
        # We keep track of the deletion
2309
 
        self.assertTrue('foo' in section.orig)
2310
 
        self.assertEquals('bar', section.orig.get('foo'))
2311
 
 
2312
 
    def test_remove_new_option(self):
2313
 
        a_dict = dict()
2314
 
        section = config.MutableSection('myID', a_dict)
2315
 
        section.set('foo', 'bar')
2316
 
        section.remove('foo')
2317
 
        self.assertFalse('foo' in section.options)
2318
 
        # The option didn't exist initially so it we need to keep track of it
2319
 
        # with a special value
2320
 
        self.assertTrue('foo' in section.orig)
2321
 
        self.assertEquals(config._NewlyCreatedOption, section.orig['foo'])
2322
 
 
2323
 
 
2324
 
class TestStore(tests.TestCaseWithTransport):
2325
 
 
2326
 
    def assertSectionContent(self, expected, section):
2327
 
        """Assert that some options have the proper values in a section."""
2328
 
        expected_name, expected_options = expected
2329
 
        self.assertEquals(expected_name, section.id)
2330
 
        self.assertEquals(
2331
 
            expected_options,
2332
 
            dict([(k, section.get(k)) for k in expected_options.keys()]))
2333
 
 
2334
 
 
2335
 
class TestReadonlyStore(TestStore):
2336
 
 
2337
 
    scenarios = [(key, {'get_store': builder}) for key, builder
2338
 
                 in config.test_store_builder_registry.iteritems()]
2339
 
 
2340
 
    def setUp(self):
2341
 
        super(TestReadonlyStore, self).setUp()
2342
 
 
2343
 
    def test_building_delays_load(self):
2344
 
        store = self.get_store(self)
2345
 
        self.assertEquals(False, store.is_loaded())
2346
 
        store._load_from_string('')
2347
 
        self.assertEquals(True, store.is_loaded())
2348
 
 
2349
 
    def test_get_no_sections_for_empty(self):
2350
 
        store = self.get_store(self)
2351
 
        store._load_from_string('')
2352
 
        self.assertEquals([], list(store.get_sections()))
2353
 
 
2354
 
    def test_get_default_section(self):
2355
 
        store = self.get_store(self)
2356
 
        store._load_from_string('foo=bar')
2357
 
        sections = list(store.get_sections())
2358
 
        self.assertLength(1, sections)
2359
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2360
 
 
2361
 
    def test_get_named_section(self):
2362
 
        store = self.get_store(self)
2363
 
        store._load_from_string('[baz]\nfoo=bar')
2364
 
        sections = list(store.get_sections())
2365
 
        self.assertLength(1, sections)
2366
 
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2367
 
 
2368
 
    def test_load_from_string_fails_for_non_empty_store(self):
2369
 
        store = self.get_store(self)
2370
 
        store._load_from_string('foo=bar')
2371
 
        self.assertRaises(AssertionError, store._load_from_string, 'bar=baz')
2372
 
 
2373
 
 
2374
 
class TestIniFileStoreContent(tests.TestCaseWithTransport):
2375
 
    """Simulate loading a config store without content of various encodings.
2376
 
 
2377
 
    All files produced by bzr are in utf8 content.
2378
 
 
2379
 
    Users may modify them manually and end up with a file that can't be
2380
 
    loaded. We need to issue proper error messages in this case.
2381
 
    """
2382
 
 
2383
 
    invalid_utf8_char = '\xff'
2384
 
 
2385
 
    def test_load_utf8(self):
2386
 
        """Ensure we can load an utf8-encoded file."""
2387
 
        t = self.get_transport()
2388
 
        # From http://pad.lv/799212
2389
 
        unicode_user = u'b\N{Euro Sign}ar'
2390
 
        unicode_content = u'user=%s' % (unicode_user,)
2391
 
        utf8_content = unicode_content.encode('utf8')
2392
 
        # Store the raw content in the config file
2393
 
        t.put_bytes('foo.conf', utf8_content)
2394
 
        store = config.IniFileStore(t, 'foo.conf')
2395
 
        store.load()
2396
 
        stack = config.Stack([store.get_sections], store)
2397
 
        self.assertEquals(unicode_user, stack.get('user'))
2398
 
 
2399
 
    def test_load_non_ascii(self):
2400
 
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
2401
 
        t = self.get_transport()
2402
 
        t.put_bytes('foo.conf', 'user=foo\n#%s\n' % (self.invalid_utf8_char,))
2403
 
        store = config.IniFileStore(t, 'foo.conf')
2404
 
        self.assertRaises(errors.ConfigContentError, store.load)
2405
 
 
2406
 
    def test_load_erroneous_content(self):
2407
 
        """Ensure we display a proper error on content that can't be parsed."""
2408
 
        t = self.get_transport()
2409
 
        t.put_bytes('foo.conf', '[open_section\n')
2410
 
        store = config.IniFileStore(t, 'foo.conf')
2411
 
        self.assertRaises(errors.ParseConfigError, store.load)
2412
 
 
2413
 
 
2414
 
class TestIniConfigContent(tests.TestCaseWithTransport):
2415
 
    """Simulate loading a IniBasedConfig without content of various encodings.
2416
 
 
2417
 
    All files produced by bzr are in utf8 content.
2418
 
 
2419
 
    Users may modify them manually and end up with a file that can't be
2420
 
    loaded. We need to issue proper error messages in this case.
2421
 
    """
2422
 
 
2423
 
    invalid_utf8_char = '\xff'
2424
 
 
2425
 
    def test_load_utf8(self):
2426
 
        """Ensure we can load an utf8-encoded file."""
2427
 
        # From http://pad.lv/799212
2428
 
        unicode_user = u'b\N{Euro Sign}ar'
2429
 
        unicode_content = u'user=%s' % (unicode_user,)
2430
 
        utf8_content = unicode_content.encode('utf8')
2431
 
        # Store the raw content in the config file
2432
 
        with open('foo.conf', 'wb') as f:
2433
 
            f.write(utf8_content)
2434
 
        conf = config.IniBasedConfig(file_name='foo.conf')
2435
 
        self.assertEquals(unicode_user, conf.get_user_option('user'))
2436
 
 
2437
 
    def test_load_badly_encoded_content(self):
2438
 
        """Ensure we display a proper error on non-ascii, non utf-8 content."""
2439
 
        with open('foo.conf', 'wb') as f:
2440
 
            f.write('user=foo\n#%s\n' % (self.invalid_utf8_char,))
2441
 
        conf = config.IniBasedConfig(file_name='foo.conf')
2442
 
        self.assertRaises(errors.ConfigContentError, conf._get_parser)
2443
 
 
2444
 
    def test_load_erroneous_content(self):
2445
 
        """Ensure we display a proper error on content that can't be parsed."""
2446
 
        with open('foo.conf', 'wb') as f:
2447
 
            f.write('[open_section\n')
2448
 
        conf = config.IniBasedConfig(file_name='foo.conf')
2449
 
        self.assertRaises(errors.ParseConfigError, conf._get_parser)
2450
 
 
2451
 
 
2452
 
class TestMutableStore(TestStore):
2453
 
 
2454
 
    scenarios = [(key, {'store_id': key, 'get_store': builder}) for key, builder
2455
 
                 in config.test_store_builder_registry.iteritems()]
2456
 
 
2457
 
    def setUp(self):
2458
 
        super(TestMutableStore, self).setUp()
2459
 
        self.transport = self.get_transport()
2460
 
 
2461
 
    def has_store(self, store):
2462
 
        store_basename = urlutils.relative_url(self.transport.external_url(),
2463
 
                                               store.external_url())
2464
 
        return self.transport.has(store_basename)
2465
 
 
2466
 
    def test_save_empty_creates_no_file(self):
2467
 
        # FIXME: There should be a better way than relying on the test
2468
 
        # parametrization to identify branch.conf -- vila 2011-0526
2469
 
        if self.store_id in ('branch', 'remote_branch'):
2470
 
            raise tests.TestNotApplicable(
2471
 
                'branch.conf is *always* created when a branch is initialized')
2472
 
        store = self.get_store(self)
2473
 
        store.save()
2474
 
        self.assertEquals(False, self.has_store(store))
2475
 
 
2476
 
    def test_save_emptied_succeeds(self):
2477
 
        store = self.get_store(self)
2478
 
        store._load_from_string('foo=bar\n')
2479
 
        section = store.get_mutable_section(None)
2480
 
        section.remove('foo')
2481
 
        store.save()
2482
 
        self.assertEquals(True, self.has_store(store))
2483
 
        modified_store = self.get_store(self)
2484
 
        sections = list(modified_store.get_sections())
2485
 
        self.assertLength(0, sections)
2486
 
 
2487
 
    def test_save_with_content_succeeds(self):
2488
 
        # FIXME: There should be a better way than relying on the test
2489
 
        # parametrization to identify branch.conf -- vila 2011-0526
2490
 
        if self.store_id in ('branch', 'remote_branch'):
2491
 
            raise tests.TestNotApplicable(
2492
 
                'branch.conf is *always* created when a branch is initialized')
2493
 
        store = self.get_store(self)
2494
 
        store._load_from_string('foo=bar\n')
2495
 
        self.assertEquals(False, self.has_store(store))
2496
 
        store.save()
2497
 
        self.assertEquals(True, self.has_store(store))
2498
 
        modified_store = self.get_store(self)
2499
 
        sections = list(modified_store.get_sections())
2500
 
        self.assertLength(1, sections)
2501
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2502
 
 
2503
 
    def test_set_option_in_empty_store(self):
2504
 
        store = self.get_store(self)
2505
 
        section = store.get_mutable_section(None)
2506
 
        section.set('foo', 'bar')
2507
 
        store.save()
2508
 
        modified_store = self.get_store(self)
2509
 
        sections = list(modified_store.get_sections())
2510
 
        self.assertLength(1, sections)
2511
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2512
 
 
2513
 
    def test_set_option_in_default_section(self):
2514
 
        store = self.get_store(self)
2515
 
        store._load_from_string('')
2516
 
        section = store.get_mutable_section(None)
2517
 
        section.set('foo', 'bar')
2518
 
        store.save()
2519
 
        modified_store = self.get_store(self)
2520
 
        sections = list(modified_store.get_sections())
2521
 
        self.assertLength(1, sections)
2522
 
        self.assertSectionContent((None, {'foo': 'bar'}), sections[0])
2523
 
 
2524
 
    def test_set_option_in_named_section(self):
2525
 
        store = self.get_store(self)
2526
 
        store._load_from_string('')
2527
 
        section = store.get_mutable_section('baz')
2528
 
        section.set('foo', 'bar')
2529
 
        store.save()
2530
 
        modified_store = self.get_store(self)
2531
 
        sections = list(modified_store.get_sections())
2532
 
        self.assertLength(1, sections)
2533
 
        self.assertSectionContent(('baz', {'foo': 'bar'}), sections[0])
2534
 
 
2535
 
    def test_load_hook(self):
2536
 
        # We first needs to ensure that the store exists
2537
 
        store = self.get_store(self)
2538
 
        section = store.get_mutable_section('baz')
2539
 
        section.set('foo', 'bar')
2540
 
        store.save()
2541
 
        # Now we can try to load it
2542
 
        store = self.get_store(self)
2543
 
        calls = []
2544
 
        def hook(*args):
2545
 
            calls.append(args)
2546
 
        config.ConfigHooks.install_named_hook('load', hook, None)
2547
 
        self.assertLength(0, calls)
2548
 
        store.load()
2549
 
        self.assertLength(1, calls)
2550
 
        self.assertEquals((store,), calls[0])
2551
 
 
2552
 
    def test_save_hook(self):
2553
 
        calls = []
2554
 
        def hook(*args):
2555
 
            calls.append(args)
2556
 
        config.ConfigHooks.install_named_hook('save', hook, None)
2557
 
        self.assertLength(0, calls)
2558
 
        store = self.get_store(self)
2559
 
        section = store.get_mutable_section('baz')
2560
 
        section.set('foo', 'bar')
2561
 
        store.save()
2562
 
        self.assertLength(1, calls)
2563
 
        self.assertEquals((store,), calls[0])
2564
 
 
2565
 
 
2566
 
class TestIniFileStore(TestStore):
2567
 
 
2568
 
    def test_loading_unknown_file_fails(self):
2569
 
        store = config.IniFileStore(self.get_transport(), 'I-do-not-exist')
2570
 
        self.assertRaises(errors.NoSuchFile, store.load)
2571
 
 
2572
 
    def test_invalid_content(self):
2573
 
        store = config.IniFileStore(self.get_transport(), 'foo.conf', )
2574
 
        self.assertEquals(False, store.is_loaded())
2575
 
        exc = self.assertRaises(
2576
 
            errors.ParseConfigError, store._load_from_string,
2577
 
            'this is invalid !')
2578
 
        self.assertEndsWith(exc.filename, 'foo.conf')
2579
 
        # And the load failed
2580
 
        self.assertEquals(False, store.is_loaded())
2581
 
 
2582
 
    def test_get_embedded_sections(self):
2583
 
        # A more complicated example (which also shows that section names and
2584
 
        # option names share the same name space...)
2585
 
        # FIXME: This should be fixed by forbidding dicts as values ?
2586
 
        # -- vila 2011-04-05
2587
 
        store = config.IniFileStore(self.get_transport(), 'foo.conf', )
2588
 
        store._load_from_string('''
2589
 
foo=bar
2590
 
l=1,2
2591
 
[DEFAULT]
2592
 
foo_in_DEFAULT=foo_DEFAULT
2593
 
[bar]
2594
 
foo_in_bar=barbar
2595
 
[baz]
2596
 
foo_in_baz=barbaz
2597
 
[[qux]]
2598
 
foo_in_qux=quux
2599
 
''')
2600
 
        sections = list(store.get_sections())
2601
 
        self.assertLength(4, sections)
2602
 
        # The default section has no name.
2603
 
        # List values are provided as lists
2604
 
        self.assertSectionContent((None, {'foo': 'bar', 'l': ['1', '2']}),
2605
 
                                  sections[0])
2606
 
        self.assertSectionContent(
2607
 
            ('DEFAULT', {'foo_in_DEFAULT': 'foo_DEFAULT'}), sections[1])
2608
 
        self.assertSectionContent(
2609
 
            ('bar', {'foo_in_bar': 'barbar'}), sections[2])
2610
 
        # sub sections are provided as embedded dicts.
2611
 
        self.assertSectionContent(
2612
 
            ('baz', {'foo_in_baz': 'barbaz', 'qux': {'foo_in_qux': 'quux'}}),
2613
 
            sections[3])
2614
 
 
2615
 
 
2616
 
class TestLockableIniFileStore(TestStore):
2617
 
 
2618
 
    def test_create_store_in_created_dir(self):
2619
 
        self.assertPathDoesNotExist('dir')
2620
 
        t = self.get_transport('dir/subdir')
2621
 
        store = config.LockableIniFileStore(t, 'foo.conf')
2622
 
        store.get_mutable_section(None).set('foo', 'bar')
2623
 
        store.save()
2624
 
        self.assertPathExists('dir/subdir')
2625
 
 
2626
 
 
2627
 
class TestConcurrentStoreUpdates(TestStore):
2628
 
    """Test that Stores properly handle conccurent updates.
2629
 
 
2630
 
    New Store implementation may fail some of these tests but until such
2631
 
    implementations exist it's hard to properly filter them from the scenarios
2632
 
    applied here. If you encounter such a case, contact the bzr devs.
2633
 
    """
2634
 
 
2635
 
    scenarios = [(key, {'get_stack': builder}) for key, builder
2636
 
                 in config.test_stack_builder_registry.iteritems()]
2637
 
 
2638
 
    def setUp(self):
2639
 
        super(TestConcurrentStoreUpdates, self).setUp()
2640
 
        self._content = 'one=1\ntwo=2\n'
2641
 
        self.stack = self.get_stack(self)
2642
 
        if not isinstance(self.stack, config._CompatibleStack):
2643
 
            raise tests.TestNotApplicable(
2644
 
                '%s is not meant to be compatible with the old config design'
2645
 
                % (self.stack,))
2646
 
        self.stack.store._load_from_string(self._content)
2647
 
        # Flush the store
2648
 
        self.stack.store.save()
2649
 
 
2650
 
    def test_simple_read_access(self):
2651
 
        self.assertEquals('1', self.stack.get('one'))
2652
 
 
2653
 
    def test_simple_write_access(self):
2654
 
        self.stack.set('one', 'one')
2655
 
        self.assertEquals('one', self.stack.get('one'))
2656
 
 
2657
 
    def test_listen_to_the_last_speaker(self):
2658
 
        c1 = self.stack
2659
 
        c2 = self.get_stack(self)
2660
 
        c1.set('one', 'ONE')
2661
 
        c2.set('two', 'TWO')
2662
 
        self.assertEquals('ONE', c1.get('one'))
2663
 
        self.assertEquals('TWO', c2.get('two'))
2664
 
        # The second update respect the first one
2665
 
        self.assertEquals('ONE', c2.get('one'))
2666
 
 
2667
 
    def test_last_speaker_wins(self):
2668
 
        # If the same config is not shared, the same variable modified twice
2669
 
        # can only see a single result.
2670
 
        c1 = self.stack
2671
 
        c2 = self.get_stack(self)
2672
 
        c1.set('one', 'c1')
2673
 
        c2.set('one', 'c2')
2674
 
        self.assertEquals('c2', c2.get('one'))
2675
 
        # The first modification is still available until another refresh
2676
 
        # occur
2677
 
        self.assertEquals('c1', c1.get('one'))
2678
 
        c1.set('two', 'done')
2679
 
        self.assertEquals('c2', c1.get('one'))
2680
 
 
2681
 
    def test_writes_are_serialized(self):
2682
 
        c1 = self.stack
2683
 
        c2 = self.get_stack(self)
2684
 
 
2685
 
        # We spawn a thread that will pause *during* the config saving.
2686
 
        before_writing = threading.Event()
2687
 
        after_writing = threading.Event()
2688
 
        writing_done = threading.Event()
2689
 
        c1_save_without_locking_orig = c1.store.save_without_locking
2690
 
        def c1_save_without_locking():
2691
 
            before_writing.set()
2692
 
            c1_save_without_locking_orig()
2693
 
            # The lock is held. We wait for the main thread to decide when to
2694
 
            # continue
2695
 
            after_writing.wait()
2696
 
        c1.store.save_without_locking = c1_save_without_locking
2697
 
        def c1_set():
2698
 
            c1.set('one', 'c1')
2699
 
            writing_done.set()
2700
 
        t1 = threading.Thread(target=c1_set)
2701
 
        # Collect the thread after the test
2702
 
        self.addCleanup(t1.join)
2703
 
        # Be ready to unblock the thread if the test goes wrong
2704
 
        self.addCleanup(after_writing.set)
2705
 
        t1.start()
2706
 
        before_writing.wait()
2707
 
        self.assertRaises(errors.LockContention,
2708
 
                          c2.set, 'one', 'c2')
2709
 
        self.assertEquals('c1', c1.get('one'))
2710
 
        # Let the lock be released
2711
 
        after_writing.set()
2712
 
        writing_done.wait()
2713
 
        c2.set('one', 'c2')
2714
 
        self.assertEquals('c2', c2.get('one'))
2715
 
 
2716
 
    def test_read_while_writing(self):
2717
 
       c1 = self.stack
2718
 
       # We spawn a thread that will pause *during* the write
2719
 
       ready_to_write = threading.Event()
2720
 
       do_writing = threading.Event()
2721
 
       writing_done = threading.Event()
2722
 
       # We override the _save implementation so we know the store is locked
2723
 
       c1_save_without_locking_orig = c1.store.save_without_locking
2724
 
       def c1_save_without_locking():
2725
 
           ready_to_write.set()
2726
 
           # The lock is held. We wait for the main thread to decide when to
2727
 
           # continue
2728
 
           do_writing.wait()
2729
 
           c1_save_without_locking_orig()
2730
 
           writing_done.set()
2731
 
       c1.store.save_without_locking = c1_save_without_locking
2732
 
       def c1_set():
2733
 
           c1.set('one', 'c1')
2734
 
       t1 = threading.Thread(target=c1_set)
2735
 
       # Collect the thread after the test
2736
 
       self.addCleanup(t1.join)
2737
 
       # Be ready to unblock the thread if the test goes wrong
2738
 
       self.addCleanup(do_writing.set)
2739
 
       t1.start()
2740
 
       # Ensure the thread is ready to write
2741
 
       ready_to_write.wait()
2742
 
       self.assertEquals('c1', c1.get('one'))
2743
 
       # If we read during the write, we get the old value
2744
 
       c2 = self.get_stack(self)
2745
 
       self.assertEquals('1', c2.get('one'))
2746
 
       # Let the writing occur and ensure it occurred
2747
 
       do_writing.set()
2748
 
       writing_done.wait()
2749
 
       # Now we get the updated value
2750
 
       c3 = self.get_stack(self)
2751
 
       self.assertEquals('c1', c3.get('one'))
2752
 
 
2753
 
    # FIXME: It may be worth looking into removing the lock dir when it's not
2754
 
    # needed anymore and look at possible fallouts for concurrent lockers. This
2755
 
    # will matter if/when we use config files outside of bazaar directories
2756
 
    # (.bazaar or .bzr) -- vila 20110-04-11
2757
 
 
2758
 
 
2759
 
class TestSectionMatcher(TestStore):
2760
 
 
2761
 
    scenarios = [('location', {'matcher': config.LocationMatcher})]
2762
 
 
2763
 
    def get_store(self, file_name):
2764
 
        return config.IniFileStore(self.get_readonly_transport(), file_name)
2765
 
 
2766
 
    def test_no_matches_for_empty_stores(self):
2767
 
        store = self.get_store('foo.conf')
2768
 
        store._load_from_string('')
2769
 
        matcher = self.matcher(store, '/bar')
2770
 
        self.assertEquals([], list(matcher.get_sections()))
2771
 
 
2772
 
    def test_build_doesnt_load_store(self):
2773
 
        store = self.get_store('foo.conf')
2774
 
        matcher = self.matcher(store, '/bar')
2775
 
        self.assertFalse(store.is_loaded())
2776
 
 
2777
 
 
2778
 
class TestLocationSection(tests.TestCase):
2779
 
 
2780
 
    def get_section(self, options, extra_path):
2781
 
        section = config.Section('foo', options)
2782
 
        # We don't care about the length so we use '0'
2783
 
        return config.LocationSection(section, 0, extra_path)
2784
 
 
2785
 
    def test_simple_option(self):
2786
 
        section = self.get_section({'foo': 'bar'}, '')
2787
 
        self.assertEquals('bar', section.get('foo'))
2788
 
 
2789
 
    def test_option_with_extra_path(self):
2790
 
        section = self.get_section({'foo': 'bar', 'foo:policy': 'appendpath'},
2791
 
                                   'baz')
2792
 
        self.assertEquals('bar/baz', section.get('foo'))
2793
 
 
2794
 
    def test_invalid_policy(self):
2795
 
        section = self.get_section({'foo': 'bar', 'foo:policy': 'die'},
2796
 
                                   'baz')
2797
 
        # invalid policies are ignored
2798
 
        self.assertEquals('bar', section.get('foo'))
2799
 
 
2800
 
 
2801
 
class TestLocationMatcher(TestStore):
2802
 
 
2803
 
    def get_store(self, file_name):
2804
 
        return config.IniFileStore(self.get_readonly_transport(), file_name)
2805
 
 
2806
 
    def test_more_specific_sections_first(self):
2807
 
        store = self.get_store('foo.conf')
2808
 
        store._load_from_string('''
2809
 
[/foo]
2810
 
section=/foo
2811
 
[/foo/bar]
2812
 
section=/foo/bar
2813
 
''')
2814
 
        self.assertEquals(['/foo', '/foo/bar'],
2815
 
                          [section.id for section in store.get_sections()])
2816
 
        matcher = config.LocationMatcher(store, '/foo/bar/baz')
2817
 
        sections = list(matcher.get_sections())
2818
 
        self.assertEquals([3, 2],
2819
 
                          [section.length for section in sections])
2820
 
        self.assertEquals(['/foo/bar', '/foo'],
2821
 
                          [section.id for section in sections])
2822
 
        self.assertEquals(['baz', 'bar/baz'],
2823
 
                          [section.extra_path for section in sections])
2824
 
 
2825
 
    def test_appendpath_in_no_name_section(self):
2826
 
        # It's a bit weird to allow appendpath in a no-name section, but
2827
 
        # someone may found a use for it
2828
 
        store = self.get_store('foo.conf')
2829
 
        store._load_from_string('''
2830
 
foo=bar
2831
 
foo:policy = appendpath
2832
 
''')
2833
 
        matcher = config.LocationMatcher(store, 'dir/subdir')
2834
 
        sections = list(matcher.get_sections())
2835
 
        self.assertLength(1, sections)
2836
 
        self.assertEquals('bar/dir/subdir', sections[0].get('foo'))
2837
 
 
2838
 
    def test_file_urls_are_normalized(self):
2839
 
        store = self.get_store('foo.conf')
2840
 
        if sys.platform == 'win32':
2841
 
            expected_url = 'file:///C:/dir/subdir'
2842
 
            expected_location = 'C:/dir/subdir'
2843
 
        else:
2844
 
            expected_url = 'file:///dir/subdir'
2845
 
            expected_location = '/dir/subdir'
2846
 
        matcher = config.LocationMatcher(store, expected_url)
2847
 
        self.assertEquals(expected_location, matcher.location)
2848
 
 
2849
 
 
2850
 
class TestStackGet(tests.TestCase):
2851
 
 
2852
 
    # FIXME: This should be parametrized for all known Stack or dedicated
2853
 
    # paramerized tests created to avoid bloating -- vila 2011-03-31
2854
 
 
2855
 
    def test_single_config_get(self):
2856
 
        conf = dict(foo='bar')
2857
 
        conf_stack = config.Stack([conf])
2858
 
        self.assertEquals('bar', conf_stack.get('foo'))
2859
 
 
2860
 
    def test_get_with_registered_default_value(self):
2861
 
        conf_stack = config.Stack([dict()])
2862
 
        opt = config.Option('foo', default='bar')
2863
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2864
 
        config.option_registry.register('foo', opt)
2865
 
        self.assertEquals('bar', conf_stack.get('foo'))
2866
 
 
2867
 
    def test_get_without_registered_default_value(self):
2868
 
        conf_stack = config.Stack([dict()])
2869
 
        opt = config.Option('foo')
2870
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2871
 
        config.option_registry.register('foo', opt)
2872
 
        self.assertEquals(None, conf_stack.get('foo'))
2873
 
 
2874
 
    def test_get_without_default_value_for_not_registered(self):
2875
 
        conf_stack = config.Stack([dict()])
2876
 
        opt = config.Option('foo')
2877
 
        self.overrideAttr(config, 'option_registry', registry.Registry())
2878
 
        self.assertEquals(None, conf_stack.get('foo'))
2879
 
 
2880
 
    def test_get_first_definition(self):
2881
 
        conf1 = dict(foo='bar')
2882
 
        conf2 = dict(foo='baz')
2883
 
        conf_stack = config.Stack([conf1, conf2])
2884
 
        self.assertEquals('bar', conf_stack.get('foo'))
2885
 
 
2886
 
    def test_get_embedded_definition(self):
2887
 
        conf1 = dict(yy='12')
2888
 
        conf2 = config.Stack([dict(xx='42'), dict(foo='baz')])
2889
 
        conf_stack = config.Stack([conf1, conf2])
2890
 
        self.assertEquals('baz', conf_stack.get('foo'))
2891
 
 
2892
 
    def test_get_for_empty_section_callable(self):
2893
 
        conf_stack = config.Stack([lambda : []])
2894
 
        self.assertEquals(None, conf_stack.get('foo'))
2895
 
 
2896
 
    def test_get_for_broken_callable(self):
2897
 
        # Trying to use and invalid callable raises an exception on first use
2898
 
        conf_stack = config.Stack([lambda : object()])
2899
 
        self.assertRaises(TypeError, conf_stack.get, 'foo')
2900
 
 
2901
 
 
2902
 
class TestStackWithTransport(tests.TestCaseWithTransport):
2903
 
 
2904
 
    scenarios = [(key, {'get_stack': builder}) for key, builder
2905
 
                 in config.test_stack_builder_registry.iteritems()]
2906
 
 
2907
 
 
2908
 
class TestConcreteStacks(TestStackWithTransport):
2909
 
 
2910
 
    def test_build_stack(self):
2911
 
        # Just a smoke test to help debug builders
2912
 
        stack = self.get_stack(self)
2913
 
 
2914
 
 
2915
 
class TestStackGet(TestStackWithTransport):
2916
 
 
2917
 
    def test_get_for_empty_stack(self):
2918
 
        conf = self.get_stack(self)
2919
 
        self.assertEquals(None, conf.get('foo'))
2920
 
 
2921
 
    def test_get_hook(self):
2922
 
        conf = self.get_stack(self)
2923
 
        conf.store._load_from_string('foo=bar')
2924
 
        calls = []
2925
 
        def hook(*args):
2926
 
            calls.append(args)
2927
 
        config.ConfigHooks.install_named_hook('get', hook, None)
2928
 
        self.assertLength(0, calls)
2929
 
        value = conf.get('foo')
2930
 
        self.assertEquals('bar', value)
2931
 
        self.assertLength(1, calls)
2932
 
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
2933
 
 
2934
 
 
2935
 
class TestStackSet(TestStackWithTransport):
2936
 
 
2937
 
    def test_simple_set(self):
2938
 
        conf = self.get_stack(self)
2939
 
        conf.store._load_from_string('foo=bar')
2940
 
        self.assertEquals('bar', conf.get('foo'))
2941
 
        conf.set('foo', 'baz')
2942
 
        # Did we get it back ?
2943
 
        self.assertEquals('baz', conf.get('foo'))
2944
 
 
2945
 
    def test_set_creates_a_new_section(self):
2946
 
        conf = self.get_stack(self)
2947
 
        conf.set('foo', 'baz')
2948
 
        self.assertEquals, 'baz', conf.get('foo')
2949
 
 
2950
 
    def test_set_hook(self):
2951
 
        calls = []
2952
 
        def hook(*args):
2953
 
            calls.append(args)
2954
 
        config.ConfigHooks.install_named_hook('set', hook, None)
2955
 
        self.assertLength(0, calls)
2956
 
        conf = self.get_stack(self)
2957
 
        conf.set('foo', 'bar')
2958
 
        self.assertLength(1, calls)
2959
 
        self.assertEquals((conf, 'foo', 'bar'), calls[0])
2960
 
 
2961
 
 
2962
 
class TestStackRemove(TestStackWithTransport):
2963
 
 
2964
 
    def test_remove_existing(self):
2965
 
        conf = self.get_stack(self)
2966
 
        conf.store._load_from_string('foo=bar')
2967
 
        self.assertEquals('bar', conf.get('foo'))
2968
 
        conf.remove('foo')
2969
 
        # Did we get it back ?
2970
 
        self.assertEquals(None, conf.get('foo'))
2971
 
 
2972
 
    def test_remove_unknown(self):
2973
 
        conf = self.get_stack(self)
2974
 
        self.assertRaises(KeyError, conf.remove, 'I_do_not_exist')
2975
 
 
2976
 
    def test_remove_hook(self):
2977
 
        calls = []
2978
 
        def hook(*args):
2979
 
            calls.append(args)
2980
 
        config.ConfigHooks.install_named_hook('remove', hook, None)
2981
 
        self.assertLength(0, calls)
2982
 
        conf = self.get_stack(self)
2983
 
        conf.store._load_from_string('foo=bar')
2984
 
        conf.remove('foo')
2985
 
        self.assertLength(1, calls)
2986
 
        self.assertEquals((conf, 'foo'), calls[0])
2987
 
 
2988
 
 
2989
1562
class TestConfigGetOptions(tests.TestCaseWithTransport, TestOptionsMixin):
2990
1563
 
2991
1564
    def setUp(self):
2992
1565
        super(TestConfigGetOptions, self).setUp()
2993
1566
        create_configs(self)
2994
1567
 
 
1568
    # One variable in none of the above
2995
1569
    def test_no_variable(self):
2996
1570
        # Using branch should query branch, locations and bazaar
2997
1571
        self.assertOptions([], self.branch_config)
3157
1731
        self.assertEquals({}, conf._get_config())
3158
1732
        self._got_user_passwd(None, None, conf, 'http', 'foo.net')
3159
1733
 
3160
 
    def test_non_utf8_config(self):
3161
 
        conf = config.AuthenticationConfig(_file=StringIO(
3162
 
                'foo = bar\xff'))
3163
 
        self.assertRaises(errors.ConfigContentError, conf._get_config)
3164
 
        
3165
1734
    def test_missing_auth_section_header(self):
3166
1735
        conf = config.AuthenticationConfig(_file=StringIO('foo = bar'))
3167
1736
        self.assertRaises(ValueError, conf.get_credentials, 'ftp', 'foo.net')
3425
1994
 
3426
1995
    def test_username_defaults_prompts(self):
3427
1996
        # HTTP prompts can't be tested here, see test_http.py
3428
 
        self._check_default_username_prompt(u'FTP %(host)s username: ', 'ftp')
3429
 
        self._check_default_username_prompt(
3430
 
            u'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
3431
 
        self._check_default_username_prompt(
3432
 
            u'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
 
1997
        self._check_default_username_prompt('FTP %(host)s username: ', 'ftp')
 
1998
        self._check_default_username_prompt(
 
1999
            'FTP %(host)s:%(port)d username: ', 'ftp', port=10020)
 
2000
        self._check_default_username_prompt(
 
2001
            'SSH %(host)s:%(port)d username: ', 'ssh', port=12345)
3433
2002
 
3434
2003
    def test_username_default_no_prompt(self):
3435
2004
        conf = config.AuthenticationConfig()
3441
2010
    def test_password_default_prompts(self):
3442
2011
        # HTTP prompts can't be tested here, see test_http.py
3443
2012
        self._check_default_password_prompt(
3444
 
            u'FTP %(user)s@%(host)s password: ', 'ftp')
3445
 
        self._check_default_password_prompt(
3446
 
            u'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
3447
 
        self._check_default_password_prompt(
3448
 
            u'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
 
2013
            'FTP %(user)s@%(host)s password: ', 'ftp')
 
2014
        self._check_default_password_prompt(
 
2015
            'FTP %(user)s@%(host)s:%(port)d password: ', 'ftp', port=10020)
 
2016
        self._check_default_password_prompt(
 
2017
            'SSH %(user)s@%(host)s:%(port)d password: ', 'ssh', port=12345)
3449
2018
        # SMTP port handling is a bit special (it's handled if embedded in the
3450
2019
        # host too)
3451
2020
        # FIXME: should we: forbid that, extend it to other schemes, leave
3452
2021
        # things as they are that's fine thank you ?
3453
 
        self._check_default_password_prompt(
3454
 
            u'SMTP %(user)s@%(host)s password: ', 'smtp')
3455
 
        self._check_default_password_prompt(
3456
 
            u'SMTP %(user)s@%(host)s password: ', 'smtp', host='bar.org:10025')
3457
 
        self._check_default_password_prompt(
3458
 
            u'SMTP %(user)s@%(host)s:%(port)d password: ', 'smtp', port=10025)
 
2022
        self._check_default_password_prompt('SMTP %(user)s@%(host)s password: ',
 
2023
                                            'smtp')
 
2024
        self._check_default_password_prompt('SMTP %(user)s@%(host)s password: ',
 
2025
                                            'smtp', host='bar.org:10025')
 
2026
        self._check_default_password_prompt(
 
2027
            'SMTP %(user)s@%(host)s:%(port)d password: ',
 
2028
            'smtp', port=10025)
3459
2029
 
3460
2030
    def test_ssh_password_emits_warning(self):
3461
2031
        conf = config.AuthenticationConfig(_file=StringIO(
3641
2211
# test_user_prompted ?
3642
2212
class TestAuthenticationRing(tests.TestCaseWithTransport):
3643
2213
    pass
3644
 
 
3645
 
 
3646
 
class TestAutoUserId(tests.TestCase):
3647
 
    """Test inferring an automatic user name."""
3648
 
 
3649
 
    def test_auto_user_id(self):
3650
 
        """Automatic inference of user name.
3651
 
        
3652
 
        This is a bit hard to test in an isolated way, because it depends on
3653
 
        system functions that go direct to /etc or perhaps somewhere else.
3654
 
        But it's reasonable to say that on Unix, with an /etc/mailname, we ought
3655
 
        to be able to choose a user name with no configuration.
3656
 
        """
3657
 
        if sys.platform == 'win32':
3658
 
            raise tests.TestSkipped(
3659
 
                "User name inference not implemented on win32")
3660
 
        realname, address = config._auto_user_id()
3661
 
        if os.path.exists('/etc/mailname'):
3662
 
            self.assertIsNot(None, realname)
3663
 
            self.assertIsNot(None, address)
3664
 
        else:
3665
 
            self.assertEquals((None, None), (realname, address))
3666