~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

(gz) Backslash escape selftest output when printing to non-unicode consoles
 (Martin [gz])

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
24
24
import sys
25
25
 
26
26
from bzrlib import (
 
27
    branch,
27
28
    bzrdir,
 
29
    controldir,
28
30
    errors,
29
31
    help_topics,
30
32
    repository,
31
33
    osutils,
32
34
    remote,
 
35
    symbol_versioning,
33
36
    urlutils,
34
37
    win32utils,
35
38
    workingtree,
36
39
    )
37
40
import bzrlib.branch
38
41
from bzrlib.errors import (NotBranchError,
 
42
                           NoColocatedBranchSupport,
39
43
                           UnknownFormatError,
40
44
                           UnsupportedFormatError,
41
45
                           )
50
54
    http_utils,
51
55
    )
52
56
from bzrlib.tests.test_http import TestWithTransport_pycurl
53
 
from bzrlib.transport import get_transport
 
57
from bzrlib.transport import (
 
58
    get_transport,
 
59
    memory,
 
60
    pathfilter,
 
61
    )
54
62
from bzrlib.transport.http._urllib import HttpTransport_urllib
55
 
from bzrlib.transport.memory import MemoryServer
56
63
from bzrlib.transport.nosmart import NoSmartTransportDecorator
57
64
from bzrlib.transport.readonly import ReadonlyTransportDecorator
58
65
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
64
71
        old_format = bzrdir.BzrDirFormat.get_default_format()
65
72
        # default is BzrDirFormat6
66
73
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
67
 
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
74
        controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
68
75
        # creating a bzr dir should now create an instrumented dir.
69
76
        try:
70
77
            result = bzrdir.BzrDir.create('memory:///')
71
78
            self.failUnless(isinstance(result, SampleBzrDir))
72
79
        finally:
73
 
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
80
            controldir.ControlDirFormat._set_default_format(old_format)
74
81
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
75
82
 
76
83
 
77
84
class TestFormatRegistry(TestCase):
78
85
 
79
86
    def make_format_registry(self):
80
 
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
87
        my_format_registry = controldir.ControlDirFormatRegistry()
81
88
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
82
89
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
83
90
            ' repositories', deprecated=True)
84
91
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
85
92
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
86
 
        my_format_registry.register_metadir('knit',
 
93
        bzrdir.register_metadir(my_format_registry, 'knit',
87
94
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
88
95
            'Format using knits',
89
96
            )
90
97
        my_format_registry.set_default('knit')
91
 
        my_format_registry.register_metadir(
 
98
        bzrdir.register_metadir(my_format_registry,
92
99
            'branch6',
93
100
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
94
101
            'Experimental successor to knit.  Use at your own risk.',
95
102
            branch_format='bzrlib.branch.BzrBranchFormat6',
96
103
            experimental=True)
97
 
        my_format_registry.register_metadir(
 
104
        bzrdir.register_metadir(my_format_registry,
98
105
            'hidden format',
99
106
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
100
107
            'Experimental successor to knit.  Use at your own risk.',
169
176
            bzrdir.format_registry.set_default_repository(old_default)
170
177
 
171
178
    def test_aliases(self):
172
 
        a_registry = bzrdir.BzrDirFormatRegistry()
 
179
        a_registry = controldir.ControlDirFormatRegistry()
173
180
        a_registry.register('weave', bzrdir.BzrDirFormat6,
174
181
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
175
182
            ' repositories', deprecated=True)
204
211
        """See BzrDir.open_repository."""
205
212
        return SampleRepository(self)
206
213
 
207
 
    def create_branch(self):
 
214
    def create_branch(self, name=None):
208
215
        """See BzrDir.create_branch."""
 
216
        if name is not None:
 
217
            raise NoColocatedBranchSupport(self)
209
218
        return SampleBranch(self)
210
219
 
211
220
    def create_workingtree(self):
354
363
 
355
364
    def test_create_branch_convenience_root(self):
356
365
        """Creating a branch at the root of a fs should work."""
357
 
        self.vfs_transport_factory = MemoryServer
 
366
        self.vfs_transport_factory = memory.MemoryServer
358
367
        # outside a repo the default convenience output is a repo+branch_tree
359
368
        format = bzrdir.format_registry.make_bzrdir('knit')
360
369
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
466
475
        # Make stackable source branch with an unstackable repo format.
467
476
        source_bzrdir = self.make_bzrdir('source')
468
477
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
469
 
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
 
478
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
 
479
            source_bzrdir)
470
480
        # Make a directory with a default stacking policy
471
481
        parent_bzrdir = self.make_bzrdir('parent')
472
482
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
565
575
 
566
576
    def setUp(self):
567
577
        super(ChrootedTests, self).setUp()
568
 
        if not self.vfs_transport_factory == MemoryServer:
 
578
        if not self.vfs_transport_factory == memory.MemoryServer:
569
579
            self.transport_readonly_server = http_server.HttpServer
570
580
 
571
581
    def local_branch_path(self, branch):
781
791
        self.build_tree(['tree1/subtree/file'])
782
792
        sub_tree.add('file')
783
793
        tree.commit('Initial commit')
 
794
        # The following line force the orhaning to reveal bug #634470
 
795
        tree.branch.get_config().set_user_option(
 
796
            'bzr.transform.orphan_policy', 'move')
784
797
        tree.bzrdir.destroy_workingtree()
 
798
        # FIXME: subtree/.bzr is left here which allows the test to pass (or
 
799
        # fail :-( ) -- vila 20100909
785
800
        repo = self.make_repository('repo', shared=True,
786
801
            format='dirstate-with-subtree')
787
802
        repo.set_make_working_trees(False)
788
 
        tree.bzrdir.sprout('repo/tree2')
789
 
        self.failUnlessExists('repo/tree2/subtree')
790
 
        self.failIfExists('repo/tree2/subtree/file')
 
803
        # FIXME: we just deleted the workingtree and now we want to use it ????
 
804
        # At a minimum, we should use tree.branch below (but this fails too
 
805
        # currently) or stop calling this test 'treeless'. Specifically, I've
 
806
        # turn the line below into an assertRaises when 'subtree/.bzr' is
 
807
        # orphaned and sprout tries to access the branch there (which is left
 
808
        # by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
 
809
        # [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
 
810
        # #634470.  -- vila 20100909
 
811
        self.assertRaises(errors.NotBranchError,
 
812
                          tree.bzrdir.sprout, 'repo/tree2')
 
813
#        self.failUnlessExists('repo/tree2/subtree')
 
814
#        self.failIfExists('repo/tree2/subtree/file')
791
815
 
792
816
    def make_foo_bar_baz(self):
793
817
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
801
825
        self.assertEqualBzrdirs([baz, foo, bar],
802
826
                                bzrdir.BzrDir.find_bzrdirs(transport))
803
827
 
 
828
    def make_fake_permission_denied_transport(self, transport, paths):
 
829
        """Create a transport that raises PermissionDenied for some paths."""
 
830
        def filter(path):
 
831
            if path in paths:
 
832
                raise errors.PermissionDenied(path)
 
833
            return path
 
834
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
 
835
        path_filter_server.start_server()
 
836
        self.addCleanup(path_filter_server.stop_server)
 
837
        path_filter_transport = pathfilter.PathFilteringTransport(
 
838
            path_filter_server, '.')
 
839
        return (path_filter_server, path_filter_transport)
 
840
 
 
841
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
 
842
        """Check that each branch url ends with the given suffix."""
 
843
        for actual_bzrdir in actual_bzrdirs:
 
844
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
 
845
 
 
846
    def test_find_bzrdirs_permission_denied(self):
 
847
        foo, bar, baz = self.make_foo_bar_baz()
 
848
        transport = get_transport(self.get_url())
 
849
        path_filter_server, path_filter_transport = \
 
850
            self.make_fake_permission_denied_transport(transport, ['foo'])
 
851
        # local transport
 
852
        self.assertBranchUrlsEndWith('/baz/',
 
853
            bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
 
854
        # smart server
 
855
        smart_transport = self.make_smart_server('.',
 
856
            backing_server=path_filter_server)
 
857
        self.assertBranchUrlsEndWith('/baz/',
 
858
            bzrdir.BzrDir.find_bzrdirs(smart_transport))
 
859
 
804
860
    def test_find_bzrdirs_list_current(self):
805
861
        def list_current(transport):
806
862
            return [s for s in transport.list_dir('') if s != 'baz']
811
867
                                bzrdir.BzrDir.find_bzrdirs(transport,
812
868
                                    list_current=list_current))
813
869
 
814
 
 
815
870
    def test_find_bzrdirs_evaluate(self):
816
871
        def evaluate(bzrdir):
817
872
            try:
850
905
        self.assertEqual(bar.root_transport.base, branches[1].base)
851
906
 
852
907
 
 
908
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
 
909
 
 
910
    def test_find_bzrdirs_missing_repo(self):
 
911
        transport = get_transport(self.get_url())
 
912
        arepo = self.make_repository('arepo', shared=True)
 
913
        abranch_url = arepo.user_url + '/abranch'
 
914
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
 
915
        transport.delete_tree('arepo/.bzr')
 
916
        self.assertRaises(errors.NoRepositoryPresent,
 
917
            branch.Branch.open, abranch_url)
 
918
        self.make_branch('baz')
 
919
        for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
 
920
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
 
921
 
 
922
 
853
923
class TestMeta1DirFormat(TestCaseWithTransport):
854
924
    """Tests specific to the meta1 dir format."""
855
925
 
1004
1074
    def _known_formats(self):
1005
1075
        return set([NotBzrDirFormat()])
1006
1076
 
1007
 
    @classmethod
 
1077
 
 
1078
class NotBzrDirProber(controldir.Prober):
 
1079
 
1008
1080
    def probe_transport(self, transport):
1009
1081
        """Our format is present if the transport ends in '.not/'."""
1010
1082
        if transport.has('.not'):
1024
1096
        dir = format.initialize(self.get_url())
1025
1097
        self.assertIsInstance(dir, NotBzrDir)
1026
1098
        # now probe for it.
1027
 
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
1099
        controldir.ControlDirFormat.register_prober(NotBzrDirProber)
1028
1100
        try:
1029
1101
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
1030
1102
                get_transport(self.get_url()))
1031
1103
            self.assertIsInstance(found, NotBzrDirFormat)
1032
1104
        finally:
1033
 
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
1105
            controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
1034
1106
 
1035
1107
    def test_included_in_known_formats(self):
1036
 
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
1108
        not_format = NotBzrDirFormat()
 
1109
        bzrlib.controldir.ControlDirFormat.register_format(not_format)
1037
1110
        try:
1038
1111
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1039
1112
            for format in formats:
1041
1114
                    return
1042
1115
            self.fail("No NotBzrDirFormat in %s" % formats)
1043
1116
        finally:
1044
 
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
1117
            bzrlib.controldir.ControlDirFormat.unregister_format(not_format)
1045
1118
 
1046
1119
 
1047
1120
class NonLocalTests(TestCaseWithTransport):
1049
1122
 
1050
1123
    def setUp(self):
1051
1124
        super(NonLocalTests, self).setUp()
1052
 
        self.vfs_transport_factory = MemoryServer
 
1125
        self.vfs_transport_factory = memory.MemoryServer
1053
1126
 
1054
1127
    def test_create_branch_convenience(self):
1055
1128
        # outside a repo the default convenience output is a repo+branch_tree
1105
1178
    """
1106
1179
 
1107
1180
    def create_transport_readonly_server(self):
 
1181
        # We don't set the http protocol version, relying on the default
1108
1182
        return http_utils.HTTPServerRedirecting()
1109
1183
 
1110
1184
    def create_transport_secondary_server(self):
 
1185
        # We don't set the http protocol version, relying on the default
1111
1186
        return http_utils.HTTPServerRedirecting()
1112
1187
 
1113
1188
    def setUp(self):
1333
1408
        url = transport.base
1334
1409
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1335
1410
        self.assertEqual('fail', err._preformatted_string)
 
1411
 
 
1412
    def test_post_repo_init(self):
 
1413
        from bzrlib.bzrdir import RepoInitHookParams
 
1414
        calls = []
 
1415
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1416
            calls.append, None)
 
1417
        self.make_repository('foo')
 
1418
        self.assertLength(1, calls)
 
1419
        params = calls[0]
 
1420
        self.assertIsInstance(params, RepoInitHookParams)
 
1421
        self.assertTrue(hasattr(params, 'bzrdir'))
 
1422
        self.assertTrue(hasattr(params, 'repository'))
 
1423
 
 
1424
    def test_post_repo_init_hook_repr(self):
 
1425
        param_reprs = []
 
1426
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1427
            lambda params: param_reprs.append(repr(params)), None)
 
1428
        self.make_repository('foo')
 
1429
        self.assertLength(1, param_reprs)
 
1430
        param_repr = param_reprs[0]
 
1431
        self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
 
1432
 
 
1433
 
 
1434
class TestGenerateBackupName(TestCaseWithMemoryTransport):
 
1435
    # FIXME: This may need to be unified with test_osutils.TestBackupNames or
 
1436
    # moved to per_bzrdir or per_transport for better coverage ?
 
1437
    # -- vila 20100909
 
1438
 
 
1439
    def setUp(self):
 
1440
        super(TestGenerateBackupName, self).setUp()
 
1441
        self._transport = get_transport(self.get_url())
 
1442
        bzrdir.BzrDir.create(self.get_url(),
 
1443
            possible_transports=[self._transport])
 
1444
        self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
 
1445
 
 
1446
    def test_deprecated_generate_backup_name(self):
 
1447
        res = self.applyDeprecated(
 
1448
                symbol_versioning.deprecated_in((2, 3, 0)),
 
1449
                self._bzrdir.generate_backup_name, 'whatever')
 
1450
 
 
1451
    def test_new(self):
 
1452
        self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
 
1453
 
 
1454
    def test_exiting(self):
 
1455
        self._transport.put_bytes("a.~1~", "some content")
 
1456
        self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))