~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Robert Collins
  • Date: 2009-04-24 05:08:51 UTC
  • mto: This revision was merged to the branch mainline in revision 4304.
  • Revision ID: robertc@robertcollins.net-20090424050851-sdfonaqerfs386t0
Reduce round trips pushing new branches substantially.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
16
16
 
17
17
"""Tests for the BzrDir facility and any format specific tests.
18
18
 
19
 
For interface contract tests, see tests/per_bzr_dir.
 
19
For interface contract tests, see tests/bzr_dir_implementations.
20
20
"""
21
21
 
22
22
import os
 
23
import os.path
 
24
from StringIO import StringIO
23
25
import subprocess
24
26
import sys
25
27
 
26
28
from bzrlib import (
27
 
    branch,
28
29
    bzrdir,
29
 
    controldir,
30
30
    errors,
31
31
    help_topics,
32
 
    lock,
33
32
    repository,
34
 
    revision as _mod_revision,
35
33
    osutils,
 
34
    symbol_versioning,
36
35
    remote,
37
 
    symbol_versioning,
38
 
    transport as _mod_transport,
39
36
    urlutils,
40
37
    win32utils,
41
 
    workingtree_3,
42
 
    workingtree_4,
 
38
    workingtree,
43
39
    )
44
40
import bzrlib.branch
45
 
from bzrlib.errors import (
46
 
    NotBranchError,
47
 
    NoColocatedBranchSupport,
48
 
    UnknownFormatError,
49
 
    UnsupportedFormatError,
50
 
    )
 
41
from bzrlib.errors import (NotBranchError,
 
42
                           UnknownFormatError,
 
43
                           UnsupportedFormatError,
 
44
                           )
51
45
from bzrlib.tests import (
52
46
    TestCase,
53
47
    TestCaseWithMemoryTransport,
54
48
    TestCaseWithTransport,
55
49
    TestSkipped,
 
50
    test_sftp_transport
56
51
    )
57
52
from bzrlib.tests import(
58
53
    http_server,
59
54
    http_utils,
60
55
    )
61
56
from bzrlib.tests.test_http import TestWithTransport_pycurl
62
 
from bzrlib.transport import (
63
 
    memory,
64
 
    pathfilter,
65
 
    )
 
57
from bzrlib.transport import get_transport
66
58
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
59
from bzrlib.transport.memory import MemoryServer
67
60
from bzrlib.transport.nosmart import NoSmartTransportDecorator
68
61
from bzrlib.transport.readonly import ReadonlyTransportDecorator
69
 
from bzrlib.repofmt import knitrepo, knitpack_repo
 
62
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
70
63
 
71
64
 
72
65
class TestDefaultFormat(TestCase):
73
66
 
74
67
    def test_get_set_default_format(self):
75
68
        old_format = bzrdir.BzrDirFormat.get_default_format()
76
 
        # default is BzrDirMetaFormat1
77
 
        self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
78
 
        controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
 
69
        # default is BzrDirFormat6
 
70
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
 
71
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
79
72
        # creating a bzr dir should now create an instrumented dir.
80
73
        try:
81
74
            result = bzrdir.BzrDir.create('memory:///')
82
 
            self.assertIsInstance(result, SampleBzrDir)
 
75
            self.failUnless(isinstance(result, SampleBzrDir))
83
76
        finally:
84
 
            controldir.ControlDirFormat._set_default_format(old_format)
 
77
            bzrdir.BzrDirFormat._set_default_format(old_format)
85
78
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
86
79
 
87
80
 
88
 
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
89
 
    """A deprecated bzr dir format."""
90
 
 
91
 
 
92
81
class TestFormatRegistry(TestCase):
93
82
 
94
83
    def make_format_registry(self):
95
 
        my_format_registry = controldir.ControlDirFormatRegistry()
96
 
        my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
97
 
            'Some format.  Slower and unawesome and deprecated.',
98
 
            deprecated=True)
99
 
        my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
100
 
            'DeprecatedBzrDirFormat', 'Format registered lazily',
101
 
            deprecated=True)
102
 
        bzrdir.register_metadir(my_format_registry, 'knit',
 
84
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
85
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
 
86
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
87
            ' repositories', deprecated=True)
 
88
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
 
89
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
 
90
        my_format_registry.register_metadir('knit',
103
91
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
104
92
            'Format using knits',
105
93
            )
106
94
        my_format_registry.set_default('knit')
107
 
        bzrdir.register_metadir(my_format_registry,
 
95
        my_format_registry.register_metadir(
108
96
            'branch6',
109
97
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
110
98
            'Experimental successor to knit.  Use at your own risk.',
111
99
            branch_format='bzrlib.branch.BzrBranchFormat6',
112
100
            experimental=True)
113
 
        bzrdir.register_metadir(my_format_registry,
 
101
        my_format_registry.register_metadir(
114
102
            'hidden format',
115
103
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
116
104
            'Experimental successor to knit.  Use at your own risk.',
117
105
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
118
 
        my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
119
 
            'Old format.  Slower and does not support things. ', hidden=True)
120
 
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
121
 
            'DeprecatedBzrDirFormat', 'Format registered lazily',
122
 
            deprecated=True, hidden=True)
 
106
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
 
107
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
108
            ' repositories', hidden=True)
 
109
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
 
110
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
 
111
            hidden=True)
123
112
        return my_format_registry
124
113
 
125
114
    def test_format_registry(self):
126
115
        my_format_registry = self.make_format_registry()
127
116
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
128
 
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
129
 
        my_bzrdir = my_format_registry.make_bzrdir('deprecated')
130
 
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
 
117
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
118
        my_bzrdir = my_format_registry.make_bzrdir('weave')
 
119
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
131
120
        my_bzrdir = my_format_registry.make_bzrdir('default')
132
121
        self.assertIsInstance(my_bzrdir.repository_format,
133
122
            knitrepo.RepositoryFormatKnit1)
146
135
                         my_format_registry.get_help('knit'))
147
136
        self.assertEqual('Format using knits',
148
137
                         my_format_registry.get_help('default'))
149
 
        self.assertEqual('Some format.  Slower and unawesome and deprecated.',
150
 
                         my_format_registry.get_help('deprecated'))
 
138
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
 
139
                         ' checkouts or shared repositories',
 
140
                         my_format_registry.get_help('weave'))
151
141
 
152
142
    def test_help_topic(self):
153
143
        topics = help_topics.HelpTopicRegistry()
159
149
        new = topics.get_detail('current-formats')
160
150
        rest = topics.get_detail('other-formats')
161
151
        experimental, deprecated = rest.split('Deprecated formats')
162
 
        self.assertContainsRe(new, 'formats-help')
 
152
        self.assertContainsRe(new, 'bzr help formats')
163
153
        self.assertContainsRe(new,
164
154
                ':knit:\n    \(native\) \(default\) Format using knits\n')
165
155
        self.assertContainsRe(experimental,
177
167
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
178
168
                          bzrdir.format_registry.get('default'))
179
169
            self.assertIs(
180
 
                repository.format_registry.get_default().__class__,
 
170
                repository.RepositoryFormat.get_default_format().__class__,
181
171
                knitrepo.RepositoryFormatKnit3)
182
172
        finally:
183
173
            bzrdir.format_registry.set_default_repository(old_default)
184
174
 
185
175
    def test_aliases(self):
186
 
        a_registry = controldir.ControlDirFormatRegistry()
187
 
        a_registry.register('deprecated', DeprecatedBzrDirFormat,
188
 
            'Old format.  Slower and does not support stuff',
189
 
            deprecated=True)
190
 
        a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
191
 
            'Old format.  Slower and does not support stuff',
192
 
            deprecated=True, alias=True)
193
 
        self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
 
176
        a_registry = bzrdir.BzrDirFormatRegistry()
 
177
        a_registry.register('weave', bzrdir.BzrDirFormat6,
 
178
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
179
            ' repositories', deprecated=True)
 
180
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
 
181
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
182
            ' repositories', deprecated=True, alias=True)
 
183
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
194
184
 
195
185
 
196
186
class SampleBranch(bzrlib.branch.Branch):
218
208
        """See BzrDir.open_repository."""
219
209
        return SampleRepository(self)
220
210
 
221
 
    def create_branch(self, name=None):
 
211
    def create_branch(self):
222
212
        """See BzrDir.create_branch."""
223
 
        if name is not None:
224
 
            raise NoColocatedBranchSupport(self)
225
213
        return SampleBranch(self)
226
214
 
227
215
    def create_workingtree(self):
253
241
        return "opened branch."
254
242
 
255
243
 
256
 
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
257
 
 
258
 
    @staticmethod
259
 
    def get_format_string():
260
 
        return "Test format 1"
261
 
 
262
 
 
263
 
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
264
 
 
265
 
    @staticmethod
266
 
    def get_format_string():
267
 
        return "Test format 2"
268
 
 
269
 
 
270
244
class TestBzrDirFormat(TestCaseWithTransport):
271
245
    """Tests for the BzrDirFormat facility."""
272
246
 
273
247
    def test_find_format(self):
274
248
        # is the right format object found for a branch?
275
249
        # create a branch with a few known format objects.
276
 
        bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
277
 
            BzrDirFormatTest1())
278
 
        self.addCleanup(bzrdir.BzrProber.formats.remove,
279
 
            BzrDirFormatTest1.get_format_string())
280
 
        bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
281
 
            BzrDirFormatTest2())
282
 
        self.addCleanup(bzrdir.BzrProber.formats.remove,
283
 
            BzrDirFormatTest2.get_format_string())
284
 
        t = self.get_transport()
 
250
        # this is not quite the same as
 
251
        t = get_transport(self.get_url())
285
252
        self.build_tree(["foo/", "bar/"], transport=t)
286
253
        def check_format(format, url):
287
254
            format.initialize(url)
288
 
            t = _mod_transport.get_transport(url)
 
255
            t = get_transport(url)
289
256
            found_format = bzrdir.BzrDirFormat.find_format(t)
290
 
            self.assertIsInstance(found_format, format.__class__)
291
 
        check_format(BzrDirFormatTest1(), "foo")
292
 
        check_format(BzrDirFormatTest2(), "bar")
 
257
            self.failUnless(isinstance(found_format, format.__class__))
 
258
        check_format(bzrdir.BzrDirFormat5(), "foo")
 
259
        check_format(bzrdir.BzrDirFormat6(), "bar")
293
260
 
294
261
    def test_find_format_nothing_there(self):
295
262
        self.assertRaises(NotBranchError,
296
263
                          bzrdir.BzrDirFormat.find_format,
297
 
                          _mod_transport.get_transport('.'))
 
264
                          get_transport('.'))
298
265
 
299
266
    def test_find_format_unknown_format(self):
300
 
        t = self.get_transport()
 
267
        t = get_transport(self.get_url())
301
268
        t.mkdir('.bzr')
302
269
        t.put_bytes('.bzr/branch-format', '')
303
270
        self.assertRaises(UnknownFormatError,
304
271
                          bzrdir.BzrDirFormat.find_format,
305
 
                          _mod_transport.get_transport('.'))
 
272
                          get_transport('.'))
306
273
 
307
274
    def test_register_unregister_format(self):
308
275
        format = SampleBzrDirFormat()
310
277
        # make a bzrdir
311
278
        format.initialize(url)
312
279
        # register a format for it.
313
 
        bzrdir.BzrProber.formats.register(format.get_format_string(), format)
 
280
        bzrdir.BzrDirFormat.register_format(format)
314
281
        # which bzrdir.Open will refuse (not supported)
315
282
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
316
283
        # which bzrdir.open_containing will refuse (not supported)
317
284
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
318
285
        # but open_downlevel will work
319
 
        t = _mod_transport.get_transport(url)
 
286
        t = get_transport(url)
320
287
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
321
288
        # unregister the format
322
 
        bzrdir.BzrProber.formats.remove(format.get_format_string())
 
289
        bzrdir.BzrDirFormat.unregister_format(format)
323
290
        # now open_downlevel should fail too.
324
291
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
325
292
 
391
358
 
392
359
    def test_create_branch_convenience_root(self):
393
360
        """Creating a branch at the root of a fs should work."""
394
 
        self.vfs_transport_factory = memory.MemoryServer
 
361
        self.vfs_transport_factory = MemoryServer
395
362
        # outside a repo the default convenience output is a repo+branch_tree
396
363
        format = bzrdir.format_registry.make_bzrdir('knit')
397
364
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
502
469
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
503
470
        # Make stackable source branch with an unstackable repo format.
504
471
        source_bzrdir = self.make_bzrdir('source')
505
 
        knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
506
 
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
507
 
            source_bzrdir)
 
472
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
 
473
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
508
474
        # Make a directory with a default stacking policy
509
475
        parent_bzrdir = self.make_bzrdir('parent')
510
476
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
603
569
 
604
570
    def setUp(self):
605
571
        super(ChrootedTests, self).setUp()
606
 
        if not self.vfs_transport_factory == memory.MemoryServer:
 
572
        if not self.vfs_transport_factory == MemoryServer:
607
573
            self.transport_readonly_server = http_server.HttpServer
608
574
 
609
575
    def local_branch_path(self, branch):
708
674
        self.assertEqual(relpath, 'baz')
709
675
 
710
676
    def test_open_containing_from_transport(self):
711
 
        self.assertRaises(NotBranchError,
712
 
            bzrdir.BzrDir.open_containing_from_transport,
713
 
            _mod_transport.get_transport(self.get_readonly_url('')))
714
 
        self.assertRaises(NotBranchError,
715
 
            bzrdir.BzrDir.open_containing_from_transport,
716
 
            _mod_transport.get_transport(self.get_readonly_url('g/p/q')))
 
677
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
678
                          get_transport(self.get_readonly_url('')))
 
679
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
680
                          get_transport(self.get_readonly_url('g/p/q')))
717
681
        control = bzrdir.BzrDir.create(self.get_url())
718
682
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
719
 
            _mod_transport.get_transport(self.get_readonly_url('')))
 
683
            get_transport(self.get_readonly_url('')))
720
684
        self.assertEqual('', relpath)
721
685
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
722
 
            _mod_transport.get_transport(self.get_readonly_url('g/p/q')))
 
686
            get_transport(self.get_readonly_url('g/p/q')))
723
687
        self.assertEqual('g/p/q', relpath)
724
688
 
725
689
    def test_open_containing_tree_or_branch(self):
769
733
        # transport pointing at bzrdir should give a bzrdir with root transport
770
734
        # set to the given transport
771
735
        control = bzrdir.BzrDir.create(self.get_url())
772
 
        t = self.get_transport()
773
 
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
774
 
        self.assertEqual(t.base, opened_bzrdir.root_transport.base)
 
736
        transport = get_transport(self.get_url())
 
737
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
 
738
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
775
739
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
776
740
 
777
741
    def test_open_from_transport_no_bzrdir(self):
778
 
        t = self.get_transport()
779
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
 
742
        transport = get_transport(self.get_url())
 
743
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
744
                          transport)
780
745
 
781
746
    def test_open_from_transport_bzrdir_in_parent(self):
782
747
        control = bzrdir.BzrDir.create(self.get_url())
783
 
        t = self.get_transport()
784
 
        t.mkdir('subdir')
785
 
        t = t.clone('subdir')
786
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
 
748
        transport = get_transport(self.get_url())
 
749
        transport.mkdir('subdir')
 
750
        transport = transport.clone('subdir')
 
751
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
752
                          transport)
787
753
 
788
754
    def test_sprout_recursive(self):
789
755
        tree = self.make_branch_and_tree('tree1',
798
764
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
799
765
        tree2.lock_read()
800
766
        self.addCleanup(tree2.unlock)
801
 
        self.assertPathExists('tree2/subtree/file')
 
767
        self.failUnlessExists('tree2/subtree/file')
802
768
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
803
769
 
804
770
    def test_cloning_metadir(self):
808
774
        branch = self.make_branch('branch', format='knit')
809
775
        format = branch.bzrdir.cloning_metadir()
810
776
        self.assertIsInstance(format.workingtree_format,
811
 
            workingtree_4.WorkingTreeFormat6)
 
777
            workingtree.WorkingTreeFormat3)
812
778
 
813
779
    def test_sprout_recursive_treeless(self):
814
780
        tree = self.make_branch_and_tree('tree1',
819
785
        self.build_tree(['tree1/subtree/file'])
820
786
        sub_tree.add('file')
821
787
        tree.commit('Initial commit')
822
 
        # The following line force the orhaning to reveal bug #634470
823
 
        tree.branch.get_config().set_user_option(
824
 
            'bzr.transform.orphan_policy', 'move')
825
788
        tree.bzrdir.destroy_workingtree()
826
 
        # FIXME: subtree/.bzr is left here which allows the test to pass (or
827
 
        # fail :-( ) -- vila 20100909
828
789
        repo = self.make_repository('repo', shared=True,
829
790
            format='dirstate-with-subtree')
830
791
        repo.set_make_working_trees(False)
831
 
        # FIXME: we just deleted the workingtree and now we want to use it ????
832
 
        # At a minimum, we should use tree.branch below (but this fails too
833
 
        # currently) or stop calling this test 'treeless'. Specifically, I've
834
 
        # turn the line below into an assertRaises when 'subtree/.bzr' is
835
 
        # orphaned and sprout tries to access the branch there (which is left
836
 
        # by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
837
 
        # [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
838
 
        # #634470.  -- vila 20100909
839
 
        self.assertRaises(errors.NotBranchError,
840
 
                          tree.bzrdir.sprout, 'repo/tree2')
841
 
#        self.assertPathExists('repo/tree2/subtree')
842
 
#        self.assertPathDoesNotExist('repo/tree2/subtree/file')
 
792
        tree.bzrdir.sprout('repo/tree2')
 
793
        self.failUnlessExists('repo/tree2/subtree')
 
794
        self.failIfExists('repo/tree2/subtree/file')
843
795
 
844
796
    def make_foo_bar_baz(self):
845
797
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
849
801
 
850
802
    def test_find_bzrdirs(self):
851
803
        foo, bar, baz = self.make_foo_bar_baz()
852
 
        t = self.get_transport()
853
 
        self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
854
 
 
855
 
    def make_fake_permission_denied_transport(self, transport, paths):
856
 
        """Create a transport that raises PermissionDenied for some paths."""
857
 
        def filter(path):
858
 
            if path in paths:
859
 
                raise errors.PermissionDenied(path)
860
 
            return path
861
 
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
862
 
        path_filter_server.start_server()
863
 
        self.addCleanup(path_filter_server.stop_server)
864
 
        path_filter_transport = pathfilter.PathFilteringTransport(
865
 
            path_filter_server, '.')
866
 
        return (path_filter_server, path_filter_transport)
867
 
 
868
 
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
869
 
        """Check that each branch url ends with the given suffix."""
870
 
        for actual_bzrdir in actual_bzrdirs:
871
 
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
872
 
 
873
 
    def test_find_bzrdirs_permission_denied(self):
874
 
        foo, bar, baz = self.make_foo_bar_baz()
875
 
        t = self.get_transport()
876
 
        path_filter_server, path_filter_transport = \
877
 
            self.make_fake_permission_denied_transport(t, ['foo'])
878
 
        # local transport
879
 
        self.assertBranchUrlsEndWith('/baz/',
880
 
            bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
881
 
        # smart server
882
 
        smart_transport = self.make_smart_server('.',
883
 
            backing_server=path_filter_server)
884
 
        self.assertBranchUrlsEndWith('/baz/',
885
 
            bzrdir.BzrDir.find_bzrdirs(smart_transport))
 
804
        transport = get_transport(self.get_url())
 
805
        self.assertEqualBzrdirs([baz, foo, bar],
 
806
                                bzrdir.BzrDir.find_bzrdirs(transport))
886
807
 
887
808
    def test_find_bzrdirs_list_current(self):
888
809
        def list_current(transport):
889
810
            return [s for s in transport.list_dir('') if s != 'baz']
890
811
 
891
812
        foo, bar, baz = self.make_foo_bar_baz()
892
 
        t = self.get_transport()
893
 
        self.assertEqualBzrdirs(
894
 
            [foo, bar],
895
 
            bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
 
813
        transport = get_transport(self.get_url())
 
814
        self.assertEqualBzrdirs([foo, bar],
 
815
                                bzrdir.BzrDir.find_bzrdirs(transport,
 
816
                                    list_current=list_current))
 
817
 
896
818
 
897
819
    def test_find_bzrdirs_evaluate(self):
898
820
        def evaluate(bzrdir):
904
826
                return False, bzrdir.root_transport.base
905
827
 
906
828
        foo, bar, baz = self.make_foo_bar_baz()
907
 
        t = self.get_transport()
 
829
        transport = get_transport(self.get_url())
908
830
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
909
 
                         list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
 
831
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
 
832
                                                         evaluate=evaluate)))
910
833
 
911
834
    def assertEqualBzrdirs(self, first, second):
912
835
        first = list(first)
919
842
        root = self.make_repository('', shared=True)
920
843
        foo, bar, baz = self.make_foo_bar_baz()
921
844
        qux = self.make_bzrdir('foo/qux')
922
 
        t = self.get_transport()
923
 
        branches = bzrdir.BzrDir.find_branches(t)
 
845
        transport = get_transport(self.get_url())
 
846
        branches = bzrdir.BzrDir.find_branches(transport)
924
847
        self.assertEqual(baz.root_transport.base, branches[0].base)
925
848
        self.assertEqual(foo.root_transport.base, branches[1].base)
926
849
        self.assertEqual(bar.root_transport.base, branches[2].base)
927
850
 
928
851
        # ensure this works without a top-level repo
929
 
        branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
 
852
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
930
853
        self.assertEqual(foo.root_transport.base, branches[0].base)
931
854
        self.assertEqual(bar.root_transport.base, branches[1].base)
932
855
 
933
856
 
934
 
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
935
 
 
936
 
    def test_find_bzrdirs_missing_repo(self):
937
 
        t = self.get_transport()
938
 
        arepo = self.make_repository('arepo', shared=True)
939
 
        abranch_url = arepo.user_url + '/abranch'
940
 
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
941
 
        t.delete_tree('arepo/.bzr')
942
 
        self.assertRaises(errors.NoRepositoryPresent,
943
 
            branch.Branch.open, abranch_url)
944
 
        self.make_branch('baz')
945
 
        for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
946
 
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
947
 
 
948
 
 
949
857
class TestMeta1DirFormat(TestCaseWithTransport):
950
858
    """Tests specific to the meta1 dir format."""
951
859
 
958
866
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
959
867
        repository_base = t.clone('repository').base
960
868
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
961
 
        repository_format = repository.format_registry.get_default()
962
869
        self.assertEqual(repository_base,
963
 
                         dir.get_repository_transport(repository_format).base)
 
870
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
964
871
        checkout_base = t.clone('checkout').base
965
872
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
966
873
        self.assertEqual(checkout_base,
967
 
                         dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
 
874
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
968
875
 
969
876
    def test_meta1dir_uses_lockdir(self):
970
877
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
1012
919
        self.assertEqual(2, rpc_count)
1013
920
 
1014
921
 
 
922
class TestFormat5(TestCaseWithTransport):
 
923
    """Tests specific to the version 5 bzrdir format."""
 
924
 
 
925
    def test_same_lockfiles_between_tree_repo_branch(self):
 
926
        # this checks that only a single lockfiles instance is created
 
927
        # for format 5 objects
 
928
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
929
        def check_dir_components_use_same_lock(dir):
 
930
            ctrl_1 = dir.open_repository().control_files
 
931
            ctrl_2 = dir.open_branch().control_files
 
932
            ctrl_3 = dir.open_workingtree()._control_files
 
933
            self.assertTrue(ctrl_1 is ctrl_2)
 
934
            self.assertTrue(ctrl_2 is ctrl_3)
 
935
        check_dir_components_use_same_lock(dir)
 
936
        # and if we open it normally.
 
937
        dir = bzrdir.BzrDir.open(self.get_url())
 
938
        check_dir_components_use_same_lock(dir)
 
939
 
 
940
    def test_can_convert(self):
 
941
        # format 5 dirs are convertable
 
942
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
943
        self.assertTrue(dir.can_convert_format())
 
944
 
 
945
    def test_needs_conversion(self):
 
946
        # format 5 dirs need a conversion if they are not the default,
 
947
        # and they aren't
 
948
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
949
        # don't need to convert it to itself
 
950
        self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
 
951
        # do need to convert it to the current default
 
952
        self.assertTrue(dir.needs_format_conversion(
 
953
            bzrdir.BzrDirFormat.get_default_format()))
 
954
 
 
955
 
 
956
class TestFormat6(TestCaseWithTransport):
 
957
    """Tests specific to the version 6 bzrdir format."""
 
958
 
 
959
    def test_same_lockfiles_between_tree_repo_branch(self):
 
960
        # this checks that only a single lockfiles instance is created
 
961
        # for format 6 objects
 
962
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
963
        def check_dir_components_use_same_lock(dir):
 
964
            ctrl_1 = dir.open_repository().control_files
 
965
            ctrl_2 = dir.open_branch().control_files
 
966
            ctrl_3 = dir.open_workingtree()._control_files
 
967
            self.assertTrue(ctrl_1 is ctrl_2)
 
968
            self.assertTrue(ctrl_2 is ctrl_3)
 
969
        check_dir_components_use_same_lock(dir)
 
970
        # and if we open it normally.
 
971
        dir = bzrdir.BzrDir.open(self.get_url())
 
972
        check_dir_components_use_same_lock(dir)
 
973
 
 
974
    def test_can_convert(self):
 
975
        # format 6 dirs are convertable
 
976
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
977
        self.assertTrue(dir.can_convert_format())
 
978
 
 
979
    def test_needs_conversion(self):
 
980
        # format 6 dirs need an conversion if they are not the default.
 
981
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
982
        self.assertTrue(dir.needs_format_conversion(
 
983
            bzrdir.BzrDirFormat.get_default_format()))
 
984
 
 
985
 
 
986
class NotBzrDir(bzrlib.bzrdir.BzrDir):
 
987
    """A non .bzr based control directory."""
 
988
 
 
989
    def __init__(self, transport, format):
 
990
        self._format = format
 
991
        self.root_transport = transport
 
992
        self.transport = transport.clone('.not')
 
993
 
 
994
 
 
995
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
 
996
    """A test class representing any non-.bzr based disk format."""
 
997
 
 
998
    def initialize_on_transport(self, transport):
 
999
        """Initialize a new .not dir in the base directory of a Transport."""
 
1000
        transport.mkdir('.not')
 
1001
        return self.open(transport)
 
1002
 
 
1003
    def open(self, transport):
 
1004
        """Open this directory."""
 
1005
        return NotBzrDir(transport, self)
 
1006
 
 
1007
    @classmethod
 
1008
    def _known_formats(self):
 
1009
        return set([NotBzrDirFormat()])
 
1010
 
 
1011
    @classmethod
 
1012
    def probe_transport(self, transport):
 
1013
        """Our format is present if the transport ends in '.not/'."""
 
1014
        if transport.has('.not'):
 
1015
            return NotBzrDirFormat()
 
1016
 
 
1017
 
 
1018
class TestNotBzrDir(TestCaseWithTransport):
 
1019
    """Tests for using the bzrdir api with a non .bzr based disk format.
 
1020
 
 
1021
    If/when one of these is in the core, we can let the implementation tests
 
1022
    verify this works.
 
1023
    """
 
1024
 
 
1025
    def test_create_and_find_format(self):
 
1026
        # create a .notbzr dir
 
1027
        format = NotBzrDirFormat()
 
1028
        dir = format.initialize(self.get_url())
 
1029
        self.assertIsInstance(dir, NotBzrDir)
 
1030
        # now probe for it.
 
1031
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
1032
        try:
 
1033
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
 
1034
                get_transport(self.get_url()))
 
1035
            self.assertIsInstance(found, NotBzrDirFormat)
 
1036
        finally:
 
1037
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
1038
 
 
1039
    def test_included_in_known_formats(self):
 
1040
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
1041
        try:
 
1042
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
 
1043
            for format in formats:
 
1044
                if isinstance(format, NotBzrDirFormat):
 
1045
                    return
 
1046
            self.fail("No NotBzrDirFormat in %s" % formats)
 
1047
        finally:
 
1048
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
1049
 
 
1050
 
1015
1051
class NonLocalTests(TestCaseWithTransport):
1016
1052
    """Tests for bzrdir static behaviour on non local paths."""
1017
1053
 
1018
1054
    def setUp(self):
1019
1055
        super(NonLocalTests, self).setUp()
1020
 
        self.vfs_transport_factory = memory.MemoryServer
 
1056
        self.vfs_transport_factory = MemoryServer
1021
1057
 
1022
1058
    def test_create_branch_convenience(self):
1023
1059
        # outside a repo the default convenience output is a repo+branch_tree
1036
1072
            self.get_url('foo'),
1037
1073
            force_new_tree=True,
1038
1074
            format=format)
1039
 
        t = self.get_transport()
 
1075
        t = get_transport(self.get_url('.'))
1040
1076
        self.assertFalse(t.has('foo'))
1041
1077
 
1042
1078
    def test_clone(self):
1058
1094
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1059
1095
        checkout_format = my_bzrdir.checkout_metadir()
1060
1096
        self.assertIsInstance(checkout_format.workingtree_format,
1061
 
                              workingtree_4.WorkingTreeFormat4)
 
1097
                              workingtree.WorkingTreeFormat3)
1062
1098
 
1063
1099
 
1064
1100
class TestHTTPRedirections(object):
1073
1109
    """
1074
1110
 
1075
1111
    def create_transport_readonly_server(self):
1076
 
        # We don't set the http protocol version, relying on the default
1077
1112
        return http_utils.HTTPServerRedirecting()
1078
1113
 
1079
1114
    def create_transport_secondary_server(self):
1080
 
        # We don't set the http protocol version, relying on the default
1081
1115
        return http_utils.HTTPServerRedirecting()
1082
1116
 
1083
1117
    def setUp(self):
1122
1156
    _transport = HttpTransport_urllib
1123
1157
 
1124
1158
    def _qualified_url(self, host, port):
1125
 
        result = 'http+urllib://%s:%s' % (host, port)
1126
 
        self.permit_url(result)
1127
 
        return result
 
1159
        return 'http+urllib://%s:%s' % (host, port)
1128
1160
 
1129
1161
 
1130
1162
 
1134
1166
    """Tests redirections for pycurl implementation"""
1135
1167
 
1136
1168
    def _qualified_url(self, host, port):
1137
 
        result = 'http+pycurl://%s:%s' % (host, port)
1138
 
        self.permit_url(result)
1139
 
        return result
 
1169
        return 'http+pycurl://%s:%s' % (host, port)
1140
1170
 
1141
1171
 
1142
1172
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1146
1176
    _transport = NoSmartTransportDecorator
1147
1177
 
1148
1178
    def _qualified_url(self, host, port):
1149
 
        result = 'nosmart+http://%s:%s' % (host, port)
1150
 
        self.permit_url(result)
1151
 
        return result
 
1179
        return 'nosmart+http://%s:%s' % (host, port)
1152
1180
 
1153
1181
 
1154
1182
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1158
1186
    _transport = ReadonlyTransportDecorator
1159
1187
 
1160
1188
    def _qualified_url(self, host, port):
1161
 
        result = 'readonly+http://%s:%s' % (host, port)
1162
 
        self.permit_url(result)
1163
 
        return result
 
1189
        return 'readonly+http://%s:%s' % (host, port)
1164
1190
 
1165
1191
 
1166
1192
class TestDotBzrHidden(TestCaseWithTransport):
1238
1264
    def copy_content_into(self, destination, revision_id=None):
1239
1265
        self.calls.append('copy_content_into')
1240
1266
 
1241
 
    def last_revision(self):
1242
 
        return _mod_revision.NULL_REVISION
1243
 
 
1244
1267
    def get_parent(self):
1245
1268
        return self._parent
1246
1269
 
1247
1270
    def set_parent(self, parent):
1248
1271
        self._parent = parent
1249
1272
 
1250
 
    def lock_read(self):
1251
 
        return lock.LogicalLockResult(self.unlock)
1252
 
 
1253
 
    def unlock(self):
1254
 
        return
1255
 
 
1256
1273
 
1257
1274
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1258
1275
 
1312
1329
        url = transport.base
1313
1330
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1314
1331
        self.assertEqual('fail', err._preformatted_string)
1315
 
 
1316
 
    def test_post_repo_init(self):
1317
 
        from bzrlib.bzrdir import RepoInitHookParams
1318
 
        calls = []
1319
 
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1320
 
            calls.append, None)
1321
 
        self.make_repository('foo')
1322
 
        self.assertLength(1, calls)
1323
 
        params = calls[0]
1324
 
        self.assertIsInstance(params, RepoInitHookParams)
1325
 
        self.assertTrue(hasattr(params, 'bzrdir'))
1326
 
        self.assertTrue(hasattr(params, 'repository'))
1327
 
 
1328
 
    def test_post_repo_init_hook_repr(self):
1329
 
        param_reprs = []
1330
 
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1331
 
            lambda params: param_reprs.append(repr(params)), None)
1332
 
        self.make_repository('foo')
1333
 
        self.assertLength(1, param_reprs)
1334
 
        param_repr = param_reprs[0]
1335
 
        self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1336
 
 
1337
 
 
1338
 
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1339
 
    # FIXME: This may need to be unified with test_osutils.TestBackupNames or
1340
 
    # moved to per_bzrdir or per_transport for better coverage ?
1341
 
    # -- vila 20100909
1342
 
 
1343
 
    def setUp(self):
1344
 
        super(TestGenerateBackupName, self).setUp()
1345
 
        self._transport = self.get_transport()
1346
 
        bzrdir.BzrDir.create(self.get_url(),
1347
 
            possible_transports=[self._transport])
1348
 
        self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1349
 
 
1350
 
    def test_deprecated_generate_backup_name(self):
1351
 
        res = self.applyDeprecated(
1352
 
                symbol_versioning.deprecated_in((2, 3, 0)),
1353
 
                self._bzrdir.generate_backup_name, 'whatever')
1354
 
 
1355
 
    def test_new(self):
1356
 
        self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1357
 
 
1358
 
    def test_exiting(self):
1359
 
        self._transport.put_bytes("a.~1~", "some content")
1360
 
        self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))
1361