~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Martin Pool
  • Date: 2010-04-01 04:41:18 UTC
  • mto: This revision was merged to the branch mainline in revision 5128.
  • Revision ID: mbp@sourcefrog.net-20100401044118-shyctqc02ob08ngz
ignore .testrepository

Show diffs side-by-side

added added

removed removed

Lines of Context:
24
24
import sys
25
25
 
26
26
from bzrlib import (
27
 
    branch,
28
27
    bzrdir,
29
 
    controldir,
30
28
    errors,
31
29
    help_topics,
32
30
    repository,
56
54
from bzrlib.transport import (
57
55
    get_transport,
58
56
    memory,
59
 
    pathfilter,
60
57
    )
61
58
from bzrlib.transport.http._urllib import HttpTransport_urllib
62
59
from bzrlib.transport.nosmart import NoSmartTransportDecorator
70
67
        old_format = bzrdir.BzrDirFormat.get_default_format()
71
68
        # default is BzrDirFormat6
72
69
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
73
 
        controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
 
70
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
74
71
        # creating a bzr dir should now create an instrumented dir.
75
72
        try:
76
73
            result = bzrdir.BzrDir.create('memory:///')
77
74
            self.failUnless(isinstance(result, SampleBzrDir))
78
75
        finally:
79
 
            controldir.ControlDirFormat._set_default_format(old_format)
 
76
            bzrdir.BzrDirFormat._set_default_format(old_format)
80
77
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
81
78
 
82
79
 
83
80
class TestFormatRegistry(TestCase):
84
81
 
85
82
    def make_format_registry(self):
86
 
        my_format_registry = controldir.ControlDirFormatRegistry()
 
83
        my_format_registry = bzrdir.BzrDirFormatRegistry()
87
84
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
88
85
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
89
86
            ' repositories', deprecated=True)
90
87
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
91
88
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
92
 
        bzrdir.register_metadir(my_format_registry, 'knit',
 
89
        my_format_registry.register_metadir('knit',
93
90
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
94
91
            'Format using knits',
95
92
            )
96
93
        my_format_registry.set_default('knit')
97
 
        bzrdir.register_metadir(my_format_registry,
 
94
        my_format_registry.register_metadir(
98
95
            'branch6',
99
96
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
100
97
            'Experimental successor to knit.  Use at your own risk.',
101
98
            branch_format='bzrlib.branch.BzrBranchFormat6',
102
99
            experimental=True)
103
 
        bzrdir.register_metadir(my_format_registry,
 
100
        my_format_registry.register_metadir(
104
101
            'hidden format',
105
102
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
106
103
            'Experimental successor to knit.  Use at your own risk.',
175
172
            bzrdir.format_registry.set_default_repository(old_default)
176
173
 
177
174
    def test_aliases(self):
178
 
        a_registry = controldir.ControlDirFormatRegistry()
 
175
        a_registry = bzrdir.BzrDirFormatRegistry()
179
176
        a_registry.register('weave', bzrdir.BzrDirFormat6,
180
177
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
181
178
            ' repositories', deprecated=True)
810
807
        self.assertEqualBzrdirs([baz, foo, bar],
811
808
                                bzrdir.BzrDir.find_bzrdirs(transport))
812
809
 
813
 
    def make_fake_permission_denied_transport(self, transport, paths):
814
 
        """Create a transport that raises PermissionDenied for some paths."""
815
 
        def filter(path):
816
 
            if path in paths:
817
 
                raise errors.PermissionDenied(path)
818
 
            return path
819
 
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
820
 
        path_filter_server.start_server()
821
 
        self.addCleanup(path_filter_server.stop_server)
822
 
        path_filter_transport = pathfilter.PathFilteringTransport(
823
 
            path_filter_server, '.')
824
 
        return (path_filter_server, path_filter_transport)
825
 
 
826
 
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
827
 
        """Check that each branch url ends with the given suffix."""
828
 
        for actual_bzrdir in actual_bzrdirs:
829
 
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
830
 
 
831
 
    def test_find_bzrdirs_permission_denied(self):
832
 
        foo, bar, baz = self.make_foo_bar_baz()
833
 
        transport = get_transport(self.get_url())
834
 
        path_filter_server, path_filter_transport = \
835
 
            self.make_fake_permission_denied_transport(transport, ['foo'])
836
 
        # local transport
837
 
        self.assertBranchUrlsEndWith('/baz/',
838
 
            bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
839
 
        # smart server
840
 
        smart_transport = self.make_smart_server('.',
841
 
            backing_server=path_filter_server)
842
 
        self.assertBranchUrlsEndWith('/baz/',
843
 
            bzrdir.BzrDir.find_bzrdirs(smart_transport))
844
 
 
845
810
    def test_find_bzrdirs_list_current(self):
846
811
        def list_current(transport):
847
812
            return [s for s in transport.list_dir('') if s != 'baz']
852
817
                                bzrdir.BzrDir.find_bzrdirs(transport,
853
818
                                    list_current=list_current))
854
819
 
 
820
 
855
821
    def test_find_bzrdirs_evaluate(self):
856
822
        def evaluate(bzrdir):
857
823
            try:
890
856
        self.assertEqual(bar.root_transport.base, branches[1].base)
891
857
 
892
858
 
893
 
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
894
 
 
895
 
    def test_find_bzrdirs_missing_repo(self):
896
 
        transport = get_transport(self.get_url())
897
 
        arepo = self.make_repository('arepo', shared=True)
898
 
        abranch_url = arepo.user_url + '/abranch'
899
 
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
900
 
        transport.delete_tree('arepo/.bzr')
901
 
        self.assertRaises(errors.NoRepositoryPresent,
902
 
            branch.Branch.open, abranch_url)
903
 
        self.make_branch('baz')
904
 
        for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
905
 
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
906
 
 
907
 
 
908
859
class TestMeta1DirFormat(TestCaseWithTransport):
909
860
    """Tests specific to the meta1 dir format."""
910
861
 
1059
1010
    def _known_formats(self):
1060
1011
        return set([NotBzrDirFormat()])
1061
1012
 
1062
 
 
1063
 
class NotBzrDirProber(controldir.Prober):
1064
 
 
 
1013
    @classmethod
1065
1014
    def probe_transport(self, transport):
1066
1015
        """Our format is present if the transport ends in '.not/'."""
1067
1016
        if transport.has('.not'):
1081
1030
        dir = format.initialize(self.get_url())
1082
1031
        self.assertIsInstance(dir, NotBzrDir)
1083
1032
        # now probe for it.
1084
 
        controldir.ControlDirFormat.register_prober(NotBzrDirProber)
 
1033
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1085
1034
        try:
1086
1035
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
1087
1036
                get_transport(self.get_url()))
1088
1037
            self.assertIsInstance(found, NotBzrDirFormat)
1089
1038
        finally:
1090
 
            controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
 
1039
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1091
1040
 
1092
1041
    def test_included_in_known_formats(self):
1093
 
        not_format = NotBzrDirFormat()
1094
 
        bzrlib.controldir.ControlDirFormat.register_format(not_format)
 
1042
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1095
1043
        try:
1096
1044
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1097
1045
            for format in formats:
1099
1047
                    return
1100
1048
            self.fail("No NotBzrDirFormat in %s" % formats)
1101
1049
        finally:
1102
 
            bzrlib.controldir.ControlDirFormat.unregister_format(not_format)
 
1050
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
1103
1051
 
1104
1052
 
1105
1053
class NonLocalTests(TestCaseWithTransport):
1163
1111
    """
1164
1112
 
1165
1113
    def create_transport_readonly_server(self):
1166
 
        # We don't set the http protocol version, relying on the default
1167
1114
        return http_utils.HTTPServerRedirecting()
1168
1115
 
1169
1116
    def create_transport_secondary_server(self):
1170
 
        # We don't set the http protocol version, relying on the default
1171
1117
        return http_utils.HTTPServerRedirecting()
1172
1118
 
1173
1119
    def setUp(self):
1393
1339
        url = transport.base
1394
1340
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1395
1341
        self.assertEqual('fail', err._preformatted_string)
1396
 
 
1397
 
    def test_post_repo_init(self):
1398
 
        from bzrlib.bzrdir import RepoInitHookParams
1399
 
        calls = []
1400
 
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1401
 
            calls.append, None)
1402
 
        self.make_repository('foo')
1403
 
        self.assertLength(1, calls)
1404
 
        params = calls[0]
1405
 
        self.assertIsInstance(params, RepoInitHookParams)
1406
 
        self.assertTrue(hasattr(params, 'bzrdir'))
1407
 
        self.assertTrue(hasattr(params, 'repository'))
1408
 
 
1409
 
    def test_post_repo_init_hook_repr(self):
1410
 
        param_reprs = []
1411
 
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1412
 
            lambda params: param_reprs.append(repr(params)), None)
1413
 
        self.make_repository('foo')
1414
 
        self.assertLength(1, param_reprs)
1415
 
        param_repr = param_reprs[0]
1416
 
        self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1417
 
 
1418
 
 
1419
 
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1420
 
 
1421
 
    def setUp(self):
1422
 
        super(TestGenerateBackupName, self).setUp()
1423
 
        self._transport = get_transport(self.get_url())
1424
 
        bzrdir.BzrDir.create(self.get_url(),
1425
 
            possible_transports=[self._transport])
1426
 
        self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1427
 
 
1428
 
    def test_new(self):
1429
 
        self.assertEqual("a.~1~", self._bzrdir.generate_backup_name("a"))
1430
 
 
1431
 
    def test_exiting(self):
1432
 
        self._transport.put_bytes("a.~1~", "some content")
1433
 
        self.assertEqual("a.~2~", self._bzrdir.generate_backup_name("a"))