~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-07-12 12:36:57 UTC
  • mfrom: (1732.3.4 bzr.revnoX)
  • Revision ID: pqm@pqm.ubuntu.com-20060712123657-365eeb32b69308bf
(matthieu) revno:x:url revision spec

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the BzrDir facility and any format specific tests.
18
18
 
19
 
For interface contract tests, see tests/per_bzr_dir.
 
19
For interface contract tests, see tests/bzr_dir_implementations.
20
20
"""
21
21
 
22
 
import os
23
 
import subprocess
24
 
import sys
 
22
from StringIO import StringIO
25
23
 
26
 
from bzrlib import (
27
 
    branch,
28
 
    bzrdir,
29
 
    controldir,
30
 
    errors,
31
 
    help_topics,
32
 
    repository,
33
 
    osutils,
34
 
    remote,
35
 
    symbol_versioning,
36
 
    urlutils,
37
 
    win32utils,
38
 
    workingtree,
39
 
    )
40
24
import bzrlib.branch
 
25
import bzrlib.bzrdir as bzrdir
 
26
import bzrlib.errors as errors
41
27
from bzrlib.errors import (NotBranchError,
42
 
                           NoColocatedBranchSupport,
43
28
                           UnknownFormatError,
44
29
                           UnsupportedFormatError,
45
30
                           )
46
 
from bzrlib.tests import (
47
 
    TestCase,
48
 
    TestCaseWithMemoryTransport,
49
 
    TestCaseWithTransport,
50
 
    TestSkipped,
51
 
    )
52
 
from bzrlib.tests import(
53
 
    http_server,
54
 
    http_utils,
55
 
    )
56
 
from bzrlib.tests.test_http import TestWithTransport_pycurl
57
 
from bzrlib.transport import (
58
 
    get_transport,
59
 
    memory,
60
 
    pathfilter,
61
 
    )
62
 
from bzrlib.transport.http._urllib import HttpTransport_urllib
63
 
from bzrlib.transport.nosmart import NoSmartTransportDecorator
64
 
from bzrlib.transport.readonly import ReadonlyTransportDecorator
65
 
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
 
31
import bzrlib.repository as repository
 
32
from bzrlib.tests import TestCase, TestCaseWithTransport
 
33
from bzrlib.transport import get_transport
 
34
from bzrlib.transport.http import HttpServer
 
35
from bzrlib.transport.memory import MemoryServer
 
36
import bzrlib.workingtree as workingtree
66
37
 
67
38
 
68
39
class TestDefaultFormat(TestCase):
71
42
        old_format = bzrdir.BzrDirFormat.get_default_format()
72
43
        # default is BzrDirFormat6
73
44
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
74
 
        controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
 
45
        bzrdir.BzrDirFormat.set_default_format(SampleBzrDirFormat())
75
46
        # creating a bzr dir should now create an instrumented dir.
76
47
        try:
77
48
            result = bzrdir.BzrDir.create('memory:///')
78
49
            self.failUnless(isinstance(result, SampleBzrDir))
79
50
        finally:
80
 
            controldir.ControlDirFormat._set_default_format(old_format)
 
51
            bzrdir.BzrDirFormat.set_default_format(old_format)
81
52
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
82
53
 
83
54
 
84
 
class TestFormatRegistry(TestCase):
85
 
 
86
 
    def make_format_registry(self):
87
 
        my_format_registry = controldir.ControlDirFormatRegistry()
88
 
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
89
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
90
 
            ' repositories', deprecated=True)
91
 
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
92
 
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
93
 
        bzrdir.register_metadir(my_format_registry, 'knit',
94
 
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
95
 
            'Format using knits',
96
 
            )
97
 
        my_format_registry.set_default('knit')
98
 
        bzrdir.register_metadir(my_format_registry,
99
 
            'branch6',
100
 
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
101
 
            'Experimental successor to knit.  Use at your own risk.',
102
 
            branch_format='bzrlib.branch.BzrBranchFormat6',
103
 
            experimental=True)
104
 
        bzrdir.register_metadir(my_format_registry,
105
 
            'hidden format',
106
 
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
107
 
            'Experimental successor to knit.  Use at your own risk.',
108
 
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
109
 
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
110
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
111
 
            ' repositories', hidden=True)
112
 
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
113
 
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
114
 
            hidden=True)
115
 
        return my_format_registry
116
 
 
117
 
    def test_format_registry(self):
118
 
        my_format_registry = self.make_format_registry()
119
 
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
120
 
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
121
 
        my_bzrdir = my_format_registry.make_bzrdir('weave')
122
 
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
123
 
        my_bzrdir = my_format_registry.make_bzrdir('default')
124
 
        self.assertIsInstance(my_bzrdir.repository_format,
125
 
            knitrepo.RepositoryFormatKnit1)
126
 
        my_bzrdir = my_format_registry.make_bzrdir('knit')
127
 
        self.assertIsInstance(my_bzrdir.repository_format,
128
 
            knitrepo.RepositoryFormatKnit1)
129
 
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
130
 
        self.assertIsInstance(my_bzrdir.get_branch_format(),
131
 
                              bzrlib.branch.BzrBranchFormat6)
132
 
 
133
 
    def test_get_help(self):
134
 
        my_format_registry = self.make_format_registry()
135
 
        self.assertEqual('Format registered lazily',
136
 
                         my_format_registry.get_help('lazy'))
137
 
        self.assertEqual('Format using knits',
138
 
                         my_format_registry.get_help('knit'))
139
 
        self.assertEqual('Format using knits',
140
 
                         my_format_registry.get_help('default'))
141
 
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
142
 
                         ' checkouts or shared repositories',
143
 
                         my_format_registry.get_help('weave'))
144
 
 
145
 
    def test_help_topic(self):
146
 
        topics = help_topics.HelpTopicRegistry()
147
 
        registry = self.make_format_registry()
148
 
        topics.register('current-formats', registry.help_topic,
149
 
                        'Current formats')
150
 
        topics.register('other-formats', registry.help_topic,
151
 
                        'Other formats')
152
 
        new = topics.get_detail('current-formats')
153
 
        rest = topics.get_detail('other-formats')
154
 
        experimental, deprecated = rest.split('Deprecated formats')
155
 
        self.assertContainsRe(new, 'formats-help')
156
 
        self.assertContainsRe(new,
157
 
                ':knit:\n    \(native\) \(default\) Format using knits\n')
158
 
        self.assertContainsRe(experimental,
159
 
                ':branch6:\n    \(native\) Experimental successor to knit')
160
 
        self.assertContainsRe(deprecated,
161
 
                ':lazy:\n    \(native\) Format registered lazily\n')
162
 
        self.assertNotContainsRe(new, 'hidden')
163
 
 
164
 
    def test_set_default_repository(self):
165
 
        default_factory = bzrdir.format_registry.get('default')
166
 
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
167
 
                       if v == default_factory and k != 'default'][0]
168
 
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
169
 
        try:
170
 
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
171
 
                          bzrdir.format_registry.get('default'))
172
 
            self.assertIs(
173
 
                repository.RepositoryFormat.get_default_format().__class__,
174
 
                knitrepo.RepositoryFormatKnit3)
175
 
        finally:
176
 
            bzrdir.format_registry.set_default_repository(old_default)
177
 
 
178
 
    def test_aliases(self):
179
 
        a_registry = controldir.ControlDirFormatRegistry()
180
 
        a_registry.register('weave', bzrdir.BzrDirFormat6,
181
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
182
 
            ' repositories', deprecated=True)
183
 
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
184
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
185
 
            ' repositories', deprecated=True, alias=True)
186
 
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
187
 
 
188
 
 
189
55
class SampleBranch(bzrlib.branch.Branch):
190
56
    """A dummy branch for guess what, dummy use."""
191
57
 
193
59
        self.bzrdir = dir
194
60
 
195
61
 
196
 
class SampleRepository(bzrlib.repository.Repository):
197
 
    """A dummy repo."""
198
 
 
199
 
    def __init__(self, dir):
200
 
        self.bzrdir = dir
201
 
 
202
 
 
203
62
class SampleBzrDir(bzrdir.BzrDir):
204
63
    """A sample BzrDir implementation to allow testing static methods."""
205
64
 
209
68
 
210
69
    def open_repository(self):
211
70
        """See BzrDir.open_repository."""
212
 
        return SampleRepository(self)
 
71
        return "A repository"
213
72
 
214
 
    def create_branch(self, name=None):
 
73
    def create_branch(self):
215
74
        """See BzrDir.create_branch."""
216
 
        if name is not None:
217
 
            raise NoColocatedBranchSupport(self)
218
75
        return SampleBranch(self)
219
76
 
220
77
    def create_workingtree(self):
225
82
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
226
83
    """A sample format
227
84
 
228
 
    this format is initializable, unsupported to aid in testing the
 
85
    this format is initializable, unsupported to aid in testing the 
229
86
    open and open_downlevel routines.
230
87
    """
231
88
 
233
90
        """See BzrDirFormat.get_format_string()."""
234
91
        return "Sample .bzr dir format."
235
92
 
236
 
    def initialize_on_transport(self, t):
 
93
    def initialize(self, url):
237
94
        """Create a bzr dir."""
 
95
        t = get_transport(url)
238
96
        t.mkdir('.bzr')
239
 
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
97
        t.put('.bzr/branch-format', StringIO(self.get_format_string()))
240
98
        return SampleBzrDir(t, self)
241
99
 
242
100
    def is_supported(self):
252
110
    def test_find_format(self):
253
111
        # is the right format object found for a branch?
254
112
        # create a branch with a few known format objects.
255
 
        # this is not quite the same as
 
113
        # this is not quite the same as 
256
114
        t = get_transport(self.get_url())
257
115
        self.build_tree(["foo/", "bar/"], transport=t)
258
116
        def check_format(format, url):
262
120
            self.failUnless(isinstance(found_format, format.__class__))
263
121
        check_format(bzrdir.BzrDirFormat5(), "foo")
264
122
        check_format(bzrdir.BzrDirFormat6(), "bar")
265
 
 
 
123
        
266
124
    def test_find_format_nothing_there(self):
267
125
        self.assertRaises(NotBranchError,
268
126
                          bzrdir.BzrDirFormat.find_format,
271
129
    def test_find_format_unknown_format(self):
272
130
        t = get_transport(self.get_url())
273
131
        t.mkdir('.bzr')
274
 
        t.put_bytes('.bzr/branch-format', '')
 
132
        t.put('.bzr/branch-format', StringIO())
275
133
        self.assertRaises(UnknownFormatError,
276
134
                          bzrdir.BzrDirFormat.find_format,
277
135
                          get_transport('.'))
295
153
        # now open_downlevel should fail too.
296
154
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
297
155
 
 
156
    def test_create_repository(self):
 
157
        format = SampleBzrDirFormat()
 
158
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
159
        bzrdir.BzrDirFormat.set_default_format(format)
 
160
        try:
 
161
            repo = bzrdir.BzrDir.create_repository(self.get_url())
 
162
            self.assertEqual('A repository', repo)
 
163
        finally:
 
164
            bzrdir.BzrDirFormat.set_default_format(old_format)
 
165
 
 
166
    def test_create_repository_shared(self):
 
167
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
168
        repo = bzrdir.BzrDir.create_repository('.', shared=True)
 
169
        self.assertTrue(repo.is_shared())
 
170
 
 
171
    def test_create_repository_nonshared(self):
 
172
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
173
        repo = bzrdir.BzrDir.create_repository('.')
 
174
        self.assertFalse(repo.is_shared())
 
175
 
 
176
    def test_create_repository_under_shared(self):
 
177
        # an explicit create_repository always does so.
 
178
        # we trust the format is right from the 'create_repository test'
 
179
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
180
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
181
        try:
 
182
            self.make_repository('.', shared=True)
 
183
            repo = bzrdir.BzrDir.create_repository(self.get_url('child'))
 
184
            self.assertTrue(isinstance(repo, repository.Repository))
 
185
            self.assertTrue(repo.bzrdir.root_transport.base.endswith('child/'))
 
186
        finally:
 
187
            bzrdir.BzrDirFormat.set_default_format(old_format)
 
188
 
298
189
    def test_create_branch_and_repo_uses_default(self):
299
190
        format = SampleBzrDirFormat()
300
 
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
301
 
                                                      format=format)
302
 
        self.assertTrue(isinstance(branch, SampleBranch))
 
191
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
192
        bzrdir.BzrDirFormat.set_default_format(format)
 
193
        try:
 
194
            branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url())
 
195
            self.assertTrue(isinstance(branch, SampleBranch))
 
196
        finally:
 
197
            bzrdir.BzrDirFormat.set_default_format(old_format)
303
198
 
304
199
    def test_create_branch_and_repo_under_shared(self):
305
200
        # creating a branch and repo in a shared repo uses the
306
201
        # shared repository
307
 
        format = bzrdir.format_registry.make_bzrdir('knit')
308
 
        self.make_repository('.', shared=True, format=format)
309
 
        branch = bzrdir.BzrDir.create_branch_and_repo(
310
 
            self.get_url('child'), format=format)
311
 
        self.assertRaises(errors.NoRepositoryPresent,
312
 
                          branch.bzrdir.open_repository)
 
202
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
203
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
204
        try:
 
205
            self.make_repository('.', shared=True)
 
206
            branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'))
 
207
            self.assertRaises(errors.NoRepositoryPresent,
 
208
                              branch.bzrdir.open_repository)
 
209
        finally:
 
210
            bzrdir.BzrDirFormat.set_default_format(old_format)
313
211
 
314
212
    def test_create_branch_and_repo_under_shared_force_new(self):
315
 
        # creating a branch and repo in a shared repo can be forced to
 
213
        # creating a branch and repo in a shared repo can be forced to 
316
214
        # make a new repo
317
 
        format = bzrdir.format_registry.make_bzrdir('knit')
318
 
        self.make_repository('.', shared=True, format=format)
319
 
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
320
 
                                                      force_new_repo=True,
321
 
                                                      format=format)
322
 
        branch.bzrdir.open_repository()
 
215
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
216
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
217
        try:
 
218
            self.make_repository('.', shared=True)
 
219
            branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
220
                                                          force_new_repo=True)
 
221
            branch.bzrdir.open_repository()
 
222
        finally:
 
223
            bzrdir.BzrDirFormat.set_default_format(old_format)
323
224
 
324
225
    def test_create_standalone_working_tree(self):
325
226
        format = SampleBzrDirFormat()
326
 
        # note this is deliberately readonly, as this failure should
327
 
        # occur before any writes.
328
 
        self.assertRaises(errors.NotLocalUrl,
329
 
                          bzrdir.BzrDir.create_standalone_workingtree,
330
 
                          self.get_readonly_url(), format=format)
331
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
332
 
                                                           format=format)
333
 
        self.assertEqual('A tree', tree)
 
227
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
228
        bzrdir.BzrDirFormat.set_default_format(format)
 
229
        try:
 
230
            # note this is deliberately readonly, as this failure should 
 
231
            # occur before any writes.
 
232
            self.assertRaises(errors.NotLocalUrl,
 
233
                              bzrdir.BzrDir.create_standalone_workingtree,
 
234
                              self.get_readonly_url())
 
235
            tree = bzrdir.BzrDir.create_standalone_workingtree('.')
 
236
            self.assertEqual('A tree', tree)
 
237
        finally:
 
238
            bzrdir.BzrDirFormat.set_default_format(old_format)
334
239
 
335
240
    def test_create_standalone_working_tree_under_shared_repo(self):
336
241
        # create standalone working tree always makes a repo.
337
 
        format = bzrdir.format_registry.make_bzrdir('knit')
338
 
        self.make_repository('.', shared=True, format=format)
339
 
        # note this is deliberately readonly, as this failure should
340
 
        # occur before any writes.
341
 
        self.assertRaises(errors.NotLocalUrl,
342
 
                          bzrdir.BzrDir.create_standalone_workingtree,
343
 
                          self.get_readonly_url('child'), format=format)
344
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
345
 
            format=format)
346
 
        tree.bzrdir.open_repository()
 
242
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
243
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
244
        try:
 
245
            self.make_repository('.', shared=True)
 
246
            # note this is deliberately readonly, as this failure should 
 
247
            # occur before any writes.
 
248
            self.assertRaises(errors.NotLocalUrl,
 
249
                              bzrdir.BzrDir.create_standalone_workingtree,
 
250
                              self.get_readonly_url('child'))
 
251
            tree = bzrdir.BzrDir.create_standalone_workingtree('child')
 
252
            tree.bzrdir.open_repository()
 
253
        finally:
 
254
            bzrdir.BzrDirFormat.set_default_format(old_format)
347
255
 
348
256
    def test_create_branch_convenience(self):
349
257
        # outside a repo the default convenience output is a repo+branch_tree
350
 
        format = bzrdir.format_registry.make_bzrdir('knit')
351
 
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
352
 
        branch.bzrdir.open_workingtree()
353
 
        branch.bzrdir.open_repository()
354
 
 
355
 
    def test_create_branch_convenience_possible_transports(self):
356
 
        """Check that the optional 'possible_transports' is recognized"""
357
 
        format = bzrdir.format_registry.make_bzrdir('knit')
358
 
        t = self.get_transport()
359
 
        branch = bzrdir.BzrDir.create_branch_convenience(
360
 
            '.', format=format, possible_transports=[t])
361
 
        branch.bzrdir.open_workingtree()
362
 
        branch.bzrdir.open_repository()
 
258
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
259
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
260
        try:
 
261
            branch = bzrdir.BzrDir.create_branch_convenience('.')
 
262
            branch.bzrdir.open_workingtree()
 
263
            branch.bzrdir.open_repository()
 
264
        finally:
 
265
            bzrdir.BzrDirFormat.set_default_format(old_format)
363
266
 
364
267
    def test_create_branch_convenience_root(self):
365
268
        """Creating a branch at the root of a fs should work."""
366
 
        self.vfs_transport_factory = memory.MemoryServer
 
269
        self.transport_server = MemoryServer
367
270
        # outside a repo the default convenience output is a repo+branch_tree
368
 
        format = bzrdir.format_registry.make_bzrdir('knit')
369
 
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
370
 
                                                         format=format)
371
 
        self.assertRaises(errors.NoWorkingTree,
372
 
                          branch.bzrdir.open_workingtree)
373
 
        branch.bzrdir.open_repository()
 
271
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
272
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
273
        try:
 
274
            branch = bzrdir.BzrDir.create_branch_convenience(self.get_url())
 
275
            self.assertRaises(errors.NoWorkingTree,
 
276
                              branch.bzrdir.open_workingtree)
 
277
            branch.bzrdir.open_repository()
 
278
        finally:
 
279
            bzrdir.BzrDirFormat.set_default_format(old_format)
374
280
 
375
281
    def test_create_branch_convenience_under_shared_repo(self):
376
282
        # inside a repo the default convenience output is a branch+ follow the
377
283
        # repo tree policy
378
 
        format = bzrdir.format_registry.make_bzrdir('knit')
379
 
        self.make_repository('.', shared=True, format=format)
380
 
        branch = bzrdir.BzrDir.create_branch_convenience('child',
381
 
            format=format)
382
 
        branch.bzrdir.open_workingtree()
383
 
        self.assertRaises(errors.NoRepositoryPresent,
384
 
                          branch.bzrdir.open_repository)
385
 
 
 
284
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
285
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
286
        try:
 
287
            self.make_repository('.', shared=True)
 
288
            branch = bzrdir.BzrDir.create_branch_convenience('child')
 
289
            branch.bzrdir.open_workingtree()
 
290
            self.assertRaises(errors.NoRepositoryPresent,
 
291
                              branch.bzrdir.open_repository)
 
292
        finally:
 
293
            bzrdir.BzrDirFormat.set_default_format(old_format)
 
294
            
386
295
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
387
296
        # inside a repo the default convenience output is a branch+ follow the
388
297
        # repo tree policy but we can override that
389
 
        format = bzrdir.format_registry.make_bzrdir('knit')
390
 
        self.make_repository('.', shared=True, format=format)
391
 
        branch = bzrdir.BzrDir.create_branch_convenience('child',
392
 
            force_new_tree=False, format=format)
393
 
        self.assertRaises(errors.NoWorkingTree,
394
 
                          branch.bzrdir.open_workingtree)
395
 
        self.assertRaises(errors.NoRepositoryPresent,
396
 
                          branch.bzrdir.open_repository)
397
 
 
 
298
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
299
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
300
        try:
 
301
            self.make_repository('.', shared=True)
 
302
            branch = bzrdir.BzrDir.create_branch_convenience('child',
 
303
                force_new_tree=False)
 
304
            self.assertRaises(errors.NoWorkingTree,
 
305
                              branch.bzrdir.open_workingtree)
 
306
            self.assertRaises(errors.NoRepositoryPresent,
 
307
                              branch.bzrdir.open_repository)
 
308
        finally:
 
309
            bzrdir.BzrDirFormat.set_default_format(old_format)
 
310
            
398
311
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
399
312
        # inside a repo the default convenience output is a branch+ follow the
400
313
        # repo tree policy
401
 
        format = bzrdir.format_registry.make_bzrdir('knit')
402
 
        repo = self.make_repository('.', shared=True, format=format)
403
 
        repo.set_make_working_trees(False)
404
 
        branch = bzrdir.BzrDir.create_branch_convenience('child',
405
 
                                                         format=format)
406
 
        self.assertRaises(errors.NoWorkingTree,
407
 
                          branch.bzrdir.open_workingtree)
408
 
        self.assertRaises(errors.NoRepositoryPresent,
409
 
                          branch.bzrdir.open_repository)
 
314
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
315
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
316
        try:
 
317
            repo = self.make_repository('.', shared=True)
 
318
            repo.set_make_working_trees(False)
 
319
            branch = bzrdir.BzrDir.create_branch_convenience('child')
 
320
            self.assertRaises(errors.NoWorkingTree,
 
321
                              branch.bzrdir.open_workingtree)
 
322
            self.assertRaises(errors.NoRepositoryPresent,
 
323
                              branch.bzrdir.open_repository)
 
324
        finally:
 
325
            bzrdir.BzrDirFormat.set_default_format(old_format)
410
326
 
411
327
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
412
328
        # inside a repo the default convenience output is a branch+ follow the
413
329
        # repo tree policy but we can override that
414
 
        format = bzrdir.format_registry.make_bzrdir('knit')
415
 
        repo = self.make_repository('.', shared=True, format=format)
416
 
        repo.set_make_working_trees(False)
417
 
        branch = bzrdir.BzrDir.create_branch_convenience('child',
418
 
            force_new_tree=True, format=format)
419
 
        branch.bzrdir.open_workingtree()
420
 
        self.assertRaises(errors.NoRepositoryPresent,
421
 
                          branch.bzrdir.open_repository)
 
330
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
331
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
332
        try:
 
333
            repo = self.make_repository('.', shared=True)
 
334
            repo.set_make_working_trees(False)
 
335
            branch = bzrdir.BzrDir.create_branch_convenience('child',
 
336
                force_new_tree=True)
 
337
            branch.bzrdir.open_workingtree()
 
338
            self.assertRaises(errors.NoRepositoryPresent,
 
339
                              branch.bzrdir.open_repository)
 
340
        finally:
 
341
            bzrdir.BzrDirFormat.set_default_format(old_format)
422
342
 
423
343
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
424
344
        # inside a repo the default convenience output is overridable to give
425
345
        # repo+branch+tree
426
 
        format = bzrdir.format_registry.make_bzrdir('knit')
427
 
        self.make_repository('.', shared=True, format=format)
428
 
        branch = bzrdir.BzrDir.create_branch_convenience('child',
429
 
            force_new_repo=True, format=format)
430
 
        branch.bzrdir.open_repository()
431
 
        branch.bzrdir.open_workingtree()
432
 
 
433
 
 
434
 
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
435
 
 
436
 
    def test_acquire_repository_standalone(self):
437
 
        """The default acquisition policy should create a standalone branch."""
438
 
        my_bzrdir = self.make_bzrdir('.')
439
 
        repo_policy = my_bzrdir.determine_repository_policy()
440
 
        repo, is_new = repo_policy.acquire_repository()
441
 
        self.assertEqual(repo.bzrdir.root_transport.base,
442
 
                         my_bzrdir.root_transport.base)
443
 
        self.assertFalse(repo.is_shared())
444
 
 
445
 
    def test_determine_stacking_policy(self):
446
 
        parent_bzrdir = self.make_bzrdir('.')
447
 
        child_bzrdir = self.make_bzrdir('child')
448
 
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
449
 
        repo_policy = child_bzrdir.determine_repository_policy()
450
 
        self.assertEqual('http://example.org', repo_policy._stack_on)
451
 
 
452
 
    def test_determine_stacking_policy_relative(self):
453
 
        parent_bzrdir = self.make_bzrdir('.')
454
 
        child_bzrdir = self.make_bzrdir('child')
455
 
        parent_bzrdir.get_config().set_default_stack_on('child2')
456
 
        repo_policy = child_bzrdir.determine_repository_policy()
457
 
        self.assertEqual('child2', repo_policy._stack_on)
458
 
        self.assertEqual(parent_bzrdir.root_transport.base,
459
 
                         repo_policy._stack_on_pwd)
460
 
 
461
 
    def prepare_default_stacking(self, child_format='1.6'):
462
 
        parent_bzrdir = self.make_bzrdir('.')
463
 
        child_branch = self.make_branch('child', format=child_format)
464
 
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
465
 
        new_child_transport = parent_bzrdir.transport.clone('child2')
466
 
        return child_branch, new_child_transport
467
 
 
468
 
    def test_clone_on_transport_obeys_stacking_policy(self):
469
 
        child_branch, new_child_transport = self.prepare_default_stacking()
470
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
471
 
        self.assertEqual(child_branch.base,
472
 
                         new_child.open_branch().get_stacked_on_url())
473
 
 
474
 
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
475
 
        # Make stackable source branch with an unstackable repo format.
476
 
        source_bzrdir = self.make_bzrdir('source')
477
 
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
478
 
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
479
 
            source_bzrdir)
480
 
        # Make a directory with a default stacking policy
481
 
        parent_bzrdir = self.make_bzrdir('parent')
482
 
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
483
 
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
484
 
        # Clone source into directory
485
 
        target = source_bzrdir.clone(self.get_url('parent/target'))
486
 
 
487
 
    def test_sprout_obeys_stacking_policy(self):
488
 
        child_branch, new_child_transport = self.prepare_default_stacking()
489
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
490
 
        self.assertEqual(child_branch.base,
491
 
                         new_child.open_branch().get_stacked_on_url())
492
 
 
493
 
    def test_clone_ignores_policy_for_unsupported_formats(self):
494
 
        child_branch, new_child_transport = self.prepare_default_stacking(
495
 
            child_format='pack-0.92')
496
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
497
 
        self.assertRaises(errors.UnstackableBranchFormat,
498
 
                          new_child.open_branch().get_stacked_on_url)
499
 
 
500
 
    def test_sprout_ignores_policy_for_unsupported_formats(self):
501
 
        child_branch, new_child_transport = self.prepare_default_stacking(
502
 
            child_format='pack-0.92')
503
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
504
 
        self.assertRaises(errors.UnstackableBranchFormat,
505
 
                          new_child.open_branch().get_stacked_on_url)
506
 
 
507
 
    def test_sprout_upgrades_format_if_stacked_specified(self):
508
 
        child_branch, new_child_transport = self.prepare_default_stacking(
509
 
            child_format='pack-0.92')
510
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
511
 
                                               stacked=True)
512
 
        self.assertEqual(child_branch.bzrdir.root_transport.base,
513
 
                         new_child.open_branch().get_stacked_on_url())
514
 
        repo = new_child.open_repository()
515
 
        self.assertTrue(repo._format.supports_external_lookups)
516
 
        self.assertFalse(repo.supports_rich_root())
517
 
 
518
 
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
519
 
        child_branch, new_child_transport = self.prepare_default_stacking(
520
 
            child_format='pack-0.92')
521
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
522
 
            stacked_on=child_branch.bzrdir.root_transport.base)
523
 
        self.assertEqual(child_branch.bzrdir.root_transport.base,
524
 
                         new_child.open_branch().get_stacked_on_url())
525
 
        repo = new_child.open_repository()
526
 
        self.assertTrue(repo._format.supports_external_lookups)
527
 
        self.assertFalse(repo.supports_rich_root())
528
 
 
529
 
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
530
 
        child_branch, new_child_transport = self.prepare_default_stacking(
531
 
            child_format='rich-root-pack')
532
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
533
 
                                               stacked=True)
534
 
        repo = new_child.open_repository()
535
 
        self.assertTrue(repo._format.supports_external_lookups)
536
 
        self.assertTrue(repo.supports_rich_root())
537
 
 
538
 
    def test_add_fallback_repo_handles_absolute_urls(self):
539
 
        stack_on = self.make_branch('stack_on', format='1.6')
540
 
        repo = self.make_repository('repo', format='1.6')
541
 
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
542
 
        policy._add_fallback(repo)
543
 
 
544
 
    def test_add_fallback_repo_handles_relative_urls(self):
545
 
        stack_on = self.make_branch('stack_on', format='1.6')
546
 
        repo = self.make_repository('repo', format='1.6')
547
 
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
548
 
        policy._add_fallback(repo)
549
 
 
550
 
    def test_configure_relative_branch_stacking_url(self):
551
 
        stack_on = self.make_branch('stack_on', format='1.6')
552
 
        stacked = self.make_branch('stack_on/stacked', format='1.6')
553
 
        policy = bzrdir.UseExistingRepository(stacked.repository,
554
 
            '.', stack_on.base)
555
 
        policy.configure_branch(stacked)
556
 
        self.assertEqual('..', stacked.get_stacked_on_url())
557
 
 
558
 
    def test_relative_branch_stacking_to_absolute(self):
559
 
        stack_on = self.make_branch('stack_on', format='1.6')
560
 
        stacked = self.make_branch('stack_on/stacked', format='1.6')
561
 
        policy = bzrdir.UseExistingRepository(stacked.repository,
562
 
            '.', self.get_readonly_url('stack_on'))
563
 
        policy.configure_branch(stacked)
564
 
        self.assertEqual(self.get_readonly_url('stack_on'),
565
 
                         stacked.get_stacked_on_url())
 
346
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
347
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
348
        try:
 
349
            self.make_repository('.', shared=True)
 
350
            branch = bzrdir.BzrDir.create_branch_convenience('child',
 
351
                force_new_repo=True)
 
352
            branch.bzrdir.open_repository()
 
353
            branch.bzrdir.open_workingtree()
 
354
        finally:
 
355
            bzrdir.BzrDirFormat.set_default_format(old_format)
566
356
 
567
357
 
568
358
class ChrootedTests(TestCaseWithTransport):
575
365
 
576
366
    def setUp(self):
577
367
        super(ChrootedTests, self).setUp()
578
 
        if not self.vfs_transport_factory == memory.MemoryServer:
579
 
            self.transport_readonly_server = http_server.HttpServer
580
 
 
581
 
    def local_branch_path(self, branch):
582
 
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
368
        if not self.transport_server == MemoryServer:
 
369
            self.transport_readonly_server = HttpServer
583
370
 
584
371
    def test_open_containing(self):
585
372
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
592
379
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
593
380
        self.assertEqual('g/p/q', relpath)
594
381
 
595
 
    def test_open_containing_tree_branch_or_repository_empty(self):
596
 
        self.assertRaises(errors.NotBranchError,
597
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
598
 
            self.get_readonly_url(''))
599
 
 
600
 
    def test_open_containing_tree_branch_or_repository_all(self):
601
 
        self.make_branch_and_tree('topdir')
602
 
        tree, branch, repo, relpath = \
603
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
604
 
                'topdir/foo')
605
 
        self.assertEqual(os.path.realpath('topdir'),
606
 
                         os.path.realpath(tree.basedir))
607
 
        self.assertEqual(os.path.realpath('topdir'),
608
 
                         self.local_branch_path(branch))
609
 
        self.assertEqual(
610
 
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
611
 
            repo.bzrdir.transport.local_abspath('repository'))
612
 
        self.assertEqual(relpath, 'foo')
613
 
 
614
 
    def test_open_containing_tree_branch_or_repository_no_tree(self):
615
 
        self.make_branch('branch')
616
 
        tree, branch, repo, relpath = \
617
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
618
 
                'branch/foo')
619
 
        self.assertEqual(tree, None)
620
 
        self.assertEqual(os.path.realpath('branch'),
621
 
                         self.local_branch_path(branch))
622
 
        self.assertEqual(
623
 
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
624
 
            repo.bzrdir.transport.local_abspath('repository'))
625
 
        self.assertEqual(relpath, 'foo')
626
 
 
627
 
    def test_open_containing_tree_branch_or_repository_repo(self):
628
 
        self.make_repository('repo')
629
 
        tree, branch, repo, relpath = \
630
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
631
 
                'repo')
632
 
        self.assertEqual(tree, None)
633
 
        self.assertEqual(branch, None)
634
 
        self.assertEqual(
635
 
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
636
 
            repo.bzrdir.transport.local_abspath('repository'))
637
 
        self.assertEqual(relpath, '')
638
 
 
639
 
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
640
 
        self.make_repository('shared', shared=True)
641
 
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
642
 
                                                force_new_tree=False)
643
 
        tree, branch, repo, relpath = \
644
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
645
 
                'shared/branch')
646
 
        self.assertEqual(tree, None)
647
 
        self.assertEqual(os.path.realpath('shared/branch'),
648
 
                         self.local_branch_path(branch))
649
 
        self.assertEqual(
650
 
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
651
 
            repo.bzrdir.transport.local_abspath('repository'))
652
 
        self.assertEqual(relpath, '')
653
 
 
654
 
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
655
 
        self.make_branch_and_tree('foo')
656
 
        self.build_tree(['foo/bar/'])
657
 
        tree, branch, repo, relpath = \
658
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
659
 
                'foo/bar')
660
 
        self.assertEqual(os.path.realpath('foo'),
661
 
                         os.path.realpath(tree.basedir))
662
 
        self.assertEqual(os.path.realpath('foo'),
663
 
                         self.local_branch_path(branch))
664
 
        self.assertEqual(
665
 
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
666
 
            repo.bzrdir.transport.local_abspath('repository'))
667
 
        self.assertEqual(relpath, 'bar')
668
 
 
669
 
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
670
 
        self.make_repository('bar')
671
 
        self.build_tree(['bar/baz/'])
672
 
        tree, branch, repo, relpath = \
673
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
674
 
                'bar/baz')
675
 
        self.assertEqual(tree, None)
676
 
        self.assertEqual(branch, None)
677
 
        self.assertEqual(
678
 
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
679
 
            repo.bzrdir.transport.local_abspath('repository'))
680
 
        self.assertEqual(relpath, 'baz')
681
 
 
682
382
    def test_open_containing_from_transport(self):
683
383
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
684
384
                          get_transport(self.get_readonly_url('')))
692
392
            get_transport(self.get_readonly_url('g/p/q')))
693
393
        self.assertEqual('g/p/q', relpath)
694
394
 
695
 
    def test_open_containing_tree_or_branch(self):
696
 
        self.make_branch_and_tree('topdir')
697
 
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
698
 
            'topdir/foo')
699
 
        self.assertEqual(os.path.realpath('topdir'),
700
 
                         os.path.realpath(tree.basedir))
701
 
        self.assertEqual(os.path.realpath('topdir'),
702
 
                         self.local_branch_path(branch))
703
 
        self.assertIs(tree.bzrdir, branch.bzrdir)
704
 
        self.assertEqual('foo', relpath)
705
 
        # opening from non-local should not return the tree
706
 
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
707
 
            self.get_readonly_url('topdir/foo'))
708
 
        self.assertEqual(None, tree)
709
 
        self.assertEqual('foo', relpath)
710
 
        # without a tree:
711
 
        self.make_branch('topdir/foo')
712
 
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
713
 
            'topdir/foo')
714
 
        self.assertIs(tree, None)
715
 
        self.assertEqual(os.path.realpath('topdir/foo'),
716
 
                         self.local_branch_path(branch))
717
 
        self.assertEqual('', relpath)
718
 
 
719
 
    def test_open_tree_or_branch(self):
720
 
        self.make_branch_and_tree('topdir')
721
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
722
 
        self.assertEqual(os.path.realpath('topdir'),
723
 
                         os.path.realpath(tree.basedir))
724
 
        self.assertEqual(os.path.realpath('topdir'),
725
 
                         self.local_branch_path(branch))
726
 
        self.assertIs(tree.bzrdir, branch.bzrdir)
727
 
        # opening from non-local should not return the tree
728
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
729
 
            self.get_readonly_url('topdir'))
730
 
        self.assertEqual(None, tree)
731
 
        # without a tree:
732
 
        self.make_branch('topdir/foo')
733
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
734
 
        self.assertIs(tree, None)
735
 
        self.assertEqual(os.path.realpath('topdir/foo'),
736
 
                         self.local_branch_path(branch))
737
 
 
738
 
    def test_open_from_transport(self):
739
 
        # transport pointing at bzrdir should give a bzrdir with root transport
740
 
        # set to the given transport
741
 
        control = bzrdir.BzrDir.create(self.get_url())
742
 
        transport = get_transport(self.get_url())
743
 
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
744
 
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
745
 
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
746
 
 
747
 
    def test_open_from_transport_no_bzrdir(self):
748
 
        transport = get_transport(self.get_url())
749
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
750
 
                          transport)
751
 
 
752
 
    def test_open_from_transport_bzrdir_in_parent(self):
753
 
        control = bzrdir.BzrDir.create(self.get_url())
754
 
        transport = get_transport(self.get_url())
755
 
        transport.mkdir('subdir')
756
 
        transport = transport.clone('subdir')
757
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
758
 
                          transport)
759
 
 
760
 
    def test_sprout_recursive(self):
761
 
        tree = self.make_branch_and_tree('tree1',
762
 
                                         format='dirstate-with-subtree')
763
 
        sub_tree = self.make_branch_and_tree('tree1/subtree',
764
 
            format='dirstate-with-subtree')
765
 
        sub_tree.set_root_id('subtree-root')
766
 
        tree.add_reference(sub_tree)
767
 
        self.build_tree(['tree1/subtree/file'])
768
 
        sub_tree.add('file')
769
 
        tree.commit('Initial commit')
770
 
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
771
 
        tree2.lock_read()
772
 
        self.addCleanup(tree2.unlock)
773
 
        self.failUnlessExists('tree2/subtree/file')
774
 
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
775
 
 
776
 
    def test_cloning_metadir(self):
777
 
        """Ensure that cloning metadir is suitable"""
778
 
        bzrdir = self.make_bzrdir('bzrdir')
779
 
        bzrdir.cloning_metadir()
780
 
        branch = self.make_branch('branch', format='knit')
781
 
        format = branch.bzrdir.cloning_metadir()
782
 
        self.assertIsInstance(format.workingtree_format,
783
 
            workingtree.WorkingTreeFormat3)
784
 
 
785
 
    def test_sprout_recursive_treeless(self):
786
 
        tree = self.make_branch_and_tree('tree1',
787
 
            format='dirstate-with-subtree')
788
 
        sub_tree = self.make_branch_and_tree('tree1/subtree',
789
 
            format='dirstate-with-subtree')
790
 
        tree.add_reference(sub_tree)
791
 
        self.build_tree(['tree1/subtree/file'])
792
 
        sub_tree.add('file')
793
 
        tree.commit('Initial commit')
794
 
        # The following line force the orhaning to reveal bug #634470
795
 
        tree.branch.get_config().set_user_option(
796
 
            'bzr.transform.orphan_policy', 'move')
797
 
        tree.bzrdir.destroy_workingtree()
798
 
        # FIXME: subtree/.bzr is left here which allows the test to pass (or
799
 
        # fail :-( ) -- vila 20100909
800
 
        repo = self.make_repository('repo', shared=True,
801
 
            format='dirstate-with-subtree')
802
 
        repo.set_make_working_trees(False)
803
 
        # FIXME: we just deleted the workingtree and now we want to use it ????
804
 
        # At a minimum, we should use tree.branch below (but this fails too
805
 
        # currently) or stop calling this test 'treeless'. Specifically, I've
806
 
        # turn the line below into an assertRaises when 'subtree/.bzr' is
807
 
        # orphaned and sprout tries to access the branch there (which is left
808
 
        # by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
809
 
        # [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
810
 
        # #634470.  -- vila 20100909
811
 
        self.assertRaises(errors.NotBranchError,
812
 
                          tree.bzrdir.sprout, 'repo/tree2')
813
 
#        self.failUnlessExists('repo/tree2/subtree')
814
 
#        self.failIfExists('repo/tree2/subtree/file')
815
 
 
816
 
    def make_foo_bar_baz(self):
817
 
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
818
 
        bar = self.make_branch('foo/bar').bzrdir
819
 
        baz = self.make_branch('baz').bzrdir
820
 
        return foo, bar, baz
821
 
 
822
 
    def test_find_bzrdirs(self):
823
 
        foo, bar, baz = self.make_foo_bar_baz()
824
 
        transport = get_transport(self.get_url())
825
 
        self.assertEqualBzrdirs([baz, foo, bar],
826
 
                                bzrdir.BzrDir.find_bzrdirs(transport))
827
 
 
828
 
    def make_fake_permission_denied_transport(self, transport, paths):
829
 
        """Create a transport that raises PermissionDenied for some paths."""
830
 
        def filter(path):
831
 
            if path in paths:
832
 
                raise errors.PermissionDenied(path)
833
 
            return path
834
 
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
835
 
        path_filter_server.start_server()
836
 
        self.addCleanup(path_filter_server.stop_server)
837
 
        path_filter_transport = pathfilter.PathFilteringTransport(
838
 
            path_filter_server, '.')
839
 
        return (path_filter_server, path_filter_transport)
840
 
 
841
 
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
842
 
        """Check that each branch url ends with the given suffix."""
843
 
        for actual_bzrdir in actual_bzrdirs:
844
 
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
845
 
 
846
 
    def test_find_bzrdirs_permission_denied(self):
847
 
        foo, bar, baz = self.make_foo_bar_baz()
848
 
        transport = get_transport(self.get_url())
849
 
        path_filter_server, path_filter_transport = \
850
 
            self.make_fake_permission_denied_transport(transport, ['foo'])
851
 
        # local transport
852
 
        self.assertBranchUrlsEndWith('/baz/',
853
 
            bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
854
 
        # smart server
855
 
        smart_transport = self.make_smart_server('.',
856
 
            backing_server=path_filter_server)
857
 
        self.assertBranchUrlsEndWith('/baz/',
858
 
            bzrdir.BzrDir.find_bzrdirs(smart_transport))
859
 
 
860
 
    def test_find_bzrdirs_list_current(self):
861
 
        def list_current(transport):
862
 
            return [s for s in transport.list_dir('') if s != 'baz']
863
 
 
864
 
        foo, bar, baz = self.make_foo_bar_baz()
865
 
        transport = get_transport(self.get_url())
866
 
        self.assertEqualBzrdirs([foo, bar],
867
 
                                bzrdir.BzrDir.find_bzrdirs(transport,
868
 
                                    list_current=list_current))
869
 
 
870
 
    def test_find_bzrdirs_evaluate(self):
871
 
        def evaluate(bzrdir):
872
 
            try:
873
 
                repo = bzrdir.open_repository()
874
 
            except NoRepositoryPresent:
875
 
                return True, bzrdir.root_transport.base
876
 
            else:
877
 
                return False, bzrdir.root_transport.base
878
 
 
879
 
        foo, bar, baz = self.make_foo_bar_baz()
880
 
        transport = get_transport(self.get_url())
881
 
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
882
 
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
883
 
                                                         evaluate=evaluate)))
884
 
 
885
 
    def assertEqualBzrdirs(self, first, second):
886
 
        first = list(first)
887
 
        second = list(second)
888
 
        self.assertEqual(len(first), len(second))
889
 
        for x, y in zip(first, second):
890
 
            self.assertEqual(x.root_transport.base, y.root_transport.base)
891
 
 
892
 
    def test_find_branches(self):
893
 
        root = self.make_repository('', shared=True)
894
 
        foo, bar, baz = self.make_foo_bar_baz()
895
 
        qux = self.make_bzrdir('foo/qux')
896
 
        transport = get_transport(self.get_url())
897
 
        branches = bzrdir.BzrDir.find_branches(transport)
898
 
        self.assertEqual(baz.root_transport.base, branches[0].base)
899
 
        self.assertEqual(foo.root_transport.base, branches[1].base)
900
 
        self.assertEqual(bar.root_transport.base, branches[2].base)
901
 
 
902
 
        # ensure this works without a top-level repo
903
 
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
904
 
        self.assertEqual(foo.root_transport.base, branches[0].base)
905
 
        self.assertEqual(bar.root_transport.base, branches[1].base)
906
 
 
907
 
 
908
 
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
909
 
 
910
 
    def test_find_bzrdirs_missing_repo(self):
911
 
        transport = get_transport(self.get_url())
912
 
        arepo = self.make_repository('arepo', shared=True)
913
 
        abranch_url = arepo.user_url + '/abranch'
914
 
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
915
 
        transport.delete_tree('arepo/.bzr')
916
 
        self.assertRaises(errors.NoRepositoryPresent,
917
 
            branch.Branch.open, abranch_url)
918
 
        self.make_branch('baz')
919
 
        for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
920
 
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
921
 
 
922
395
 
923
396
class TestMeta1DirFormat(TestCaseWithTransport):
924
397
    """Tests specific to the meta1 dir format."""
933
406
        repository_base = t.clone('repository').base
934
407
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
935
408
        self.assertEqual(repository_base,
936
 
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
409
                         dir.get_repository_transport(repository.RepositoryFormat7()).base)
937
410
        checkout_base = t.clone('checkout').base
938
411
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
939
412
        self.assertEqual(checkout_base,
945
418
        t = dir.transport
946
419
        self.assertIsDirectory('branch-lock', t)
947
420
 
948
 
    def test_comparison(self):
949
 
        """Equality and inequality behave properly.
950
 
 
951
 
        Metadirs should compare equal iff they have the same repo, branch and
952
 
        tree formats.
953
 
        """
954
 
        mydir = bzrdir.format_registry.make_bzrdir('knit')
955
 
        self.assertEqual(mydir, mydir)
956
 
        self.assertFalse(mydir != mydir)
957
 
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
958
 
        self.assertEqual(otherdir, mydir)
959
 
        self.assertFalse(otherdir != mydir)
960
 
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
961
 
        self.assertNotEqual(otherdir2, mydir)
962
 
        self.assertFalse(otherdir2 == mydir)
963
 
 
964
 
    def test_needs_conversion_different_working_tree(self):
965
 
        # meta1dirs need an conversion if any element is not the default.
966
 
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
967
 
        tree = self.make_branch_and_tree('tree', format='knit')
968
 
        self.assertTrue(tree.bzrdir.needs_format_conversion(
969
 
            new_format))
970
 
 
971
 
    def test_initialize_on_format_uses_smart_transport(self):
972
 
        self.setup_smart_server_with_call_log()
973
 
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
974
 
        transport = self.get_transport('target')
975
 
        transport.ensure_base()
976
 
        self.reset_smart_call_log()
977
 
        instance = new_format.initialize_on_transport(transport)
978
 
        self.assertIsInstance(instance, remote.RemoteBzrDir)
979
 
        rpc_count = len(self.hpss_calls)
980
 
        # This figure represent the amount of work to perform this use case. It
981
 
        # is entirely ok to reduce this number if a test fails due to rpc_count
982
 
        # being too low. If rpc_count increases, more network roundtrips have
983
 
        # become necessary for this use case. Please do not adjust this number
984
 
        # upwards without agreement from bzr's network support maintainers.
985
 
        self.assertEqual(2, rpc_count)
986
 
 
987
 
 
 
421
        
988
422
class TestFormat5(TestCaseWithTransport):
989
423
    """Tests specific to the version 5 bzrdir format."""
990
424
 
991
425
    def test_same_lockfiles_between_tree_repo_branch(self):
992
 
        # this checks that only a single lockfiles instance is created
 
426
        # this checks that only a single lockfiles instance is created 
993
427
        # for format 5 objects
994
428
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
995
429
        def check_dir_components_use_same_lock(dir):
1002
436
        # and if we open it normally.
1003
437
        dir = bzrdir.BzrDir.open(self.get_url())
1004
438
        check_dir_components_use_same_lock(dir)
1005
 
 
 
439
    
1006
440
    def test_can_convert(self):
1007
441
        # format 5 dirs are convertable
1008
442
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
1009
443
        self.assertTrue(dir.can_convert_format())
1010
 
 
 
444
    
1011
445
    def test_needs_conversion(self):
1012
 
        # format 5 dirs need a conversion if they are not the default,
1013
 
        # and they aren't
1014
 
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
1015
 
        # don't need to convert it to itself
1016
 
        self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
1017
 
        # do need to convert it to the current default
1018
 
        self.assertTrue(dir.needs_format_conversion(
1019
 
            bzrdir.BzrDirFormat.get_default_format()))
 
446
        # format 5 dirs need a conversion if they are not the default.
 
447
        # and they start of not the default.
 
448
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
449
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirFormat5())
 
450
        try:
 
451
            dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
452
            self.assertFalse(dir.needs_format_conversion())
 
453
        finally:
 
454
            bzrdir.BzrDirFormat.set_default_format(old_format)
 
455
        self.assertTrue(dir.needs_format_conversion())
1020
456
 
1021
457
 
1022
458
class TestFormat6(TestCaseWithTransport):
1023
459
    """Tests specific to the version 6 bzrdir format."""
1024
460
 
1025
461
    def test_same_lockfiles_between_tree_repo_branch(self):
1026
 
        # this checks that only a single lockfiles instance is created
 
462
        # this checks that only a single lockfiles instance is created 
1027
463
        # for format 6 objects
1028
464
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1029
465
        def check_dir_components_use_same_lock(dir):
1036
472
        # and if we open it normally.
1037
473
        dir = bzrdir.BzrDir.open(self.get_url())
1038
474
        check_dir_components_use_same_lock(dir)
1039
 
 
 
475
    
1040
476
    def test_can_convert(self):
1041
477
        # format 6 dirs are convertable
1042
478
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1043
479
        self.assertTrue(dir.can_convert_format())
1044
 
 
 
480
    
1045
481
    def test_needs_conversion(self):
1046
482
        # format 6 dirs need an conversion if they are not the default.
1047
 
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
1048
 
        self.assertTrue(dir.needs_format_conversion(
1049
 
            bzrdir.BzrDirFormat.get_default_format()))
 
483
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
484
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
485
        try:
 
486
            dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
487
            self.assertTrue(dir.needs_format_conversion())
 
488
        finally:
 
489
            bzrdir.BzrDirFormat.set_default_format(old_format)
1050
490
 
1051
491
 
1052
492
class NotBzrDir(bzrlib.bzrdir.BzrDir):
1074
514
    def _known_formats(self):
1075
515
        return set([NotBzrDirFormat()])
1076
516
 
1077
 
 
1078
 
class NotBzrDirProber(controldir.Prober):
1079
 
 
 
517
    @classmethod
1080
518
    def probe_transport(self, transport):
1081
519
        """Our format is present if the transport ends in '.not/'."""
1082
520
        if transport.has('.not'):
1085
523
 
1086
524
class TestNotBzrDir(TestCaseWithTransport):
1087
525
    """Tests for using the bzrdir api with a non .bzr based disk format.
1088
 
 
 
526
    
1089
527
    If/when one of these is in the core, we can let the implementation tests
1090
528
    verify this works.
1091
529
    """
1092
530
 
1093
531
    def test_create_and_find_format(self):
1094
 
        # create a .notbzr dir
 
532
        # create a .notbzr dir 
1095
533
        format = NotBzrDirFormat()
1096
534
        dir = format.initialize(self.get_url())
1097
535
        self.assertIsInstance(dir, NotBzrDir)
1098
536
        # now probe for it.
1099
 
        controldir.ControlDirFormat.register_prober(NotBzrDirProber)
 
537
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
1100
538
        try:
1101
539
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
1102
540
                get_transport(self.get_url()))
1103
541
            self.assertIsInstance(found, NotBzrDirFormat)
1104
542
        finally:
1105
 
            controldir.ControlDirFormat.unregister_prober(NotBzrDirProber)
 
543
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1106
544
 
1107
545
    def test_included_in_known_formats(self):
1108
 
        not_format = NotBzrDirFormat()
1109
 
        bzrlib.controldir.ControlDirFormat.register_format(not_format)
 
546
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1110
547
        try:
1111
548
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1112
549
            for format in formats:
1114
551
                    return
1115
552
            self.fail("No NotBzrDirFormat in %s" % formats)
1116
553
        finally:
1117
 
            bzrlib.controldir.ControlDirFormat.unregister_format(not_format)
 
554
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
1118
555
 
1119
556
 
1120
557
class NonLocalTests(TestCaseWithTransport):
1122
559
 
1123
560
    def setUp(self):
1124
561
        super(NonLocalTests, self).setUp()
1125
 
        self.vfs_transport_factory = memory.MemoryServer
1126
 
 
 
562
        self.transport_server = MemoryServer
 
563
    
1127
564
    def test_create_branch_convenience(self):
1128
565
        # outside a repo the default convenience output is a repo+branch_tree
1129
 
        format = bzrdir.format_registry.make_bzrdir('knit')
1130
 
        branch = bzrdir.BzrDir.create_branch_convenience(
1131
 
            self.get_url('foo'), format=format)
1132
 
        self.assertRaises(errors.NoWorkingTree,
1133
 
                          branch.bzrdir.open_workingtree)
1134
 
        branch.bzrdir.open_repository()
 
566
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
567
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
568
        try:
 
569
            branch = bzrdir.BzrDir.create_branch_convenience(self.get_url('foo'))
 
570
            self.assertRaises(errors.NoWorkingTree,
 
571
                              branch.bzrdir.open_workingtree)
 
572
            branch.bzrdir.open_repository()
 
573
        finally:
 
574
            bzrdir.BzrDirFormat.set_default_format(old_format)
1135
575
 
1136
576
    def test_create_branch_convenience_force_tree_not_local_fails(self):
1137
577
        # outside a repo the default convenience output is a repo+branch_tree
1138
 
        format = bzrdir.format_registry.make_bzrdir('knit')
1139
 
        self.assertRaises(errors.NotLocalUrl,
1140
 
            bzrdir.BzrDir.create_branch_convenience,
1141
 
            self.get_url('foo'),
1142
 
            force_new_tree=True,
1143
 
            format=format)
1144
 
        t = get_transport(self.get_url('.'))
1145
 
        self.assertFalse(t.has('foo'))
 
578
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
579
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
580
        try:
 
581
            self.assertRaises(errors.NotLocalUrl,
 
582
                bzrdir.BzrDir.create_branch_convenience,
 
583
                self.get_url('foo'),
 
584
                force_new_tree=True)
 
585
            t = get_transport(self.get_url('.'))
 
586
            self.assertFalse(t.has('foo'))
 
587
        finally:
 
588
            bzrdir.BzrDirFormat.set_default_format(old_format)
1146
589
 
1147
590
    def test_clone(self):
1148
591
        # clone into a nonlocal path works
1149
 
        format = bzrdir.format_registry.make_bzrdir('knit')
1150
 
        branch = bzrdir.BzrDir.create_branch_convenience('local',
1151
 
                                                         format=format)
 
592
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
593
        bzrdir.BzrDirFormat.set_default_format(bzrdir.BzrDirMetaFormat1())
 
594
        try:
 
595
            branch = bzrdir.BzrDir.create_branch_convenience('local')
 
596
        finally:
 
597
            bzrdir.BzrDirFormat.set_default_format(old_format)
1152
598
        branch.bzrdir.open_workingtree()
1153
599
        result = branch.bzrdir.clone(self.get_url('remote'))
1154
600
        self.assertRaises(errors.NoWorkingTree,
1156
602
        result.open_branch()
1157
603
        result.open_repository()
1158
604
 
1159
 
    def test_checkout_metadir(self):
1160
 
        # checkout_metadir has reasonable working tree format even when no
1161
 
        # working tree is present
1162
 
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
1163
 
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1164
 
        checkout_format = my_bzrdir.checkout_metadir()
1165
 
        self.assertIsInstance(checkout_format.workingtree_format,
1166
 
                              workingtree.WorkingTreeFormat3)
1167
 
 
1168
 
 
1169
 
class TestHTTPRedirections(object):
1170
 
    """Test redirection between two http servers.
1171
 
 
1172
 
    This MUST be used by daughter classes that also inherit from
1173
 
    TestCaseWithTwoWebservers.
1174
 
 
1175
 
    We can't inherit directly from TestCaseWithTwoWebservers or the
1176
 
    test framework will try to create an instance which cannot
1177
 
    run, its implementation being incomplete.
1178
 
    """
1179
 
 
1180
 
    def create_transport_readonly_server(self):
1181
 
        # We don't set the http protocol version, relying on the default
1182
 
        return http_utils.HTTPServerRedirecting()
1183
 
 
1184
 
    def create_transport_secondary_server(self):
1185
 
        # We don't set the http protocol version, relying on the default
1186
 
        return http_utils.HTTPServerRedirecting()
1187
 
 
1188
 
    def setUp(self):
1189
 
        super(TestHTTPRedirections, self).setUp()
1190
 
        # The redirections will point to the new server
1191
 
        self.new_server = self.get_readonly_server()
1192
 
        # The requests to the old server will be redirected
1193
 
        self.old_server = self.get_secondary_server()
1194
 
        # Configure the redirections
1195
 
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
1196
 
 
1197
 
    def test_loop(self):
1198
 
        # Both servers redirect to each other creating a loop
1199
 
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1200
 
        # Starting from either server should loop
1201
 
        old_url = self._qualified_url(self.old_server.host,
1202
 
                                      self.old_server.port)
1203
 
        oldt = self._transport(old_url)
1204
 
        self.assertRaises(errors.NotBranchError,
1205
 
                          bzrdir.BzrDir.open_from_transport, oldt)
1206
 
        new_url = self._qualified_url(self.new_server.host,
1207
 
                                      self.new_server.port)
1208
 
        newt = self._transport(new_url)
1209
 
        self.assertRaises(errors.NotBranchError,
1210
 
                          bzrdir.BzrDir.open_from_transport, newt)
1211
 
 
1212
 
    def test_qualifier_preserved(self):
1213
 
        wt = self.make_branch_and_tree('branch')
1214
 
        old_url = self._qualified_url(self.old_server.host,
1215
 
                                      self.old_server.port)
1216
 
        start = self._transport(old_url).clone('branch')
1217
 
        bdir = bzrdir.BzrDir.open_from_transport(start)
1218
 
        # Redirection should preserve the qualifier, hence the transport class
1219
 
        # itself.
1220
 
        self.assertIsInstance(bdir.root_transport, type(start))
1221
 
 
1222
 
 
1223
 
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1224
 
                                  http_utils.TestCaseWithTwoWebservers):
1225
 
    """Tests redirections for urllib implementation"""
1226
 
 
1227
 
    _transport = HttpTransport_urllib
1228
 
 
1229
 
    def _qualified_url(self, host, port):
1230
 
        result = 'http+urllib://%s:%s' % (host, port)
1231
 
        self.permit_url(result)
1232
 
        return result
1233
 
 
1234
 
 
1235
 
 
1236
 
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1237
 
                                  TestHTTPRedirections,
1238
 
                                  http_utils.TestCaseWithTwoWebservers):
1239
 
    """Tests redirections for pycurl implementation"""
1240
 
 
1241
 
    def _qualified_url(self, host, port):
1242
 
        result = 'http+pycurl://%s:%s' % (host, port)
1243
 
        self.permit_url(result)
1244
 
        return result
1245
 
 
1246
 
 
1247
 
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1248
 
                                  http_utils.TestCaseWithTwoWebservers):
1249
 
    """Tests redirections for the nosmart decorator"""
1250
 
 
1251
 
    _transport = NoSmartTransportDecorator
1252
 
 
1253
 
    def _qualified_url(self, host, port):
1254
 
        result = 'nosmart+http://%s:%s' % (host, port)
1255
 
        self.permit_url(result)
1256
 
        return result
1257
 
 
1258
 
 
1259
 
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1260
 
                                    http_utils.TestCaseWithTwoWebservers):
1261
 
    """Tests redirections for readonly decoratror"""
1262
 
 
1263
 
    _transport = ReadonlyTransportDecorator
1264
 
 
1265
 
    def _qualified_url(self, host, port):
1266
 
        result = 'readonly+http://%s:%s' % (host, port)
1267
 
        self.permit_url(result)
1268
 
        return result
1269
 
 
1270
 
 
1271
 
class TestDotBzrHidden(TestCaseWithTransport):
1272
 
 
1273
 
    ls = ['ls']
1274
 
    if sys.platform == 'win32':
1275
 
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1276
 
 
1277
 
    def get_ls(self):
1278
 
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1279
 
            stderr=subprocess.PIPE)
1280
 
        out, err = f.communicate()
1281
 
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1282
 
                         % (self.ls, err))
1283
 
        return out.splitlines()
1284
 
 
1285
 
    def test_dot_bzr_hidden(self):
1286
 
        if sys.platform == 'win32' and not win32utils.has_win32file:
1287
 
            raise TestSkipped('unable to make file hidden without pywin32 library')
1288
 
        b = bzrdir.BzrDir.create('.')
1289
 
        self.build_tree(['a'])
1290
 
        self.assertEquals(['a'], self.get_ls())
1291
 
 
1292
 
    def test_dot_bzr_hidden_with_url(self):
1293
 
        if sys.platform == 'win32' and not win32utils.has_win32file:
1294
 
            raise TestSkipped('unable to make file hidden without pywin32 library')
1295
 
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1296
 
        self.build_tree(['a'])
1297
 
        self.assertEquals(['a'], self.get_ls())
1298
 
 
1299
 
 
1300
 
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1301
 
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
1302
 
 
1303
 
    def _open(self, transport):
1304
 
        return _TestBzrDir(transport, self)
1305
 
 
1306
 
 
1307
 
class _TestBzrDir(bzrdir.BzrDirMeta1):
1308
 
    """Test BzrDir implementation for TestBzrDirSprout.
1309
 
 
1310
 
    When created a _TestBzrDir already has repository and a branch.  The branch
1311
 
    is a test double as well.
1312
 
    """
1313
 
 
1314
 
    def __init__(self, *args, **kwargs):
1315
 
        super(_TestBzrDir, self).__init__(*args, **kwargs)
1316
 
        self.test_branch = _TestBranch()
1317
 
        self.test_branch.repository = self.create_repository()
1318
 
 
1319
 
    def open_branch(self, unsupported=False):
1320
 
        return self.test_branch
1321
 
 
1322
 
    def cloning_metadir(self, require_stacking=False):
1323
 
        return _TestBzrDirFormat()
1324
 
 
1325
 
 
1326
 
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1327
 
    """Test Branch format for TestBzrDirSprout."""
1328
 
 
1329
 
 
1330
 
class _TestBranch(bzrlib.branch.Branch):
1331
 
    """Test Branch implementation for TestBzrDirSprout."""
1332
 
 
1333
 
    def __init__(self, *args, **kwargs):
1334
 
        self._format = _TestBranchFormat()
1335
 
        super(_TestBranch, self).__init__(*args, **kwargs)
1336
 
        self.calls = []
1337
 
        self._parent = None
1338
 
 
1339
 
    def sprout(self, *args, **kwargs):
1340
 
        self.calls.append('sprout')
1341
 
        return _TestBranch()
1342
 
 
1343
 
    def copy_content_into(self, destination, revision_id=None):
1344
 
        self.calls.append('copy_content_into')
1345
 
 
1346
 
    def get_parent(self):
1347
 
        return self._parent
1348
 
 
1349
 
    def set_parent(self, parent):
1350
 
        self._parent = parent
1351
 
 
1352
 
 
1353
 
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1354
 
 
1355
 
    def test_sprout_uses_branch_sprout(self):
1356
 
        """BzrDir.sprout calls Branch.sprout.
1357
 
 
1358
 
        Usually, BzrDir.sprout should delegate to the branch's sprout method
1359
 
        for part of the work.  This allows the source branch to control the
1360
 
        choice of format for the new branch.
1361
 
 
1362
 
        There are exceptions, but this tests avoids them:
1363
 
          - if there's no branch in the source bzrdir,
1364
 
          - or if the stacking has been requested and the format needs to be
1365
 
            overridden to satisfy that.
1366
 
        """
1367
 
        # Make an instrumented bzrdir.
1368
 
        t = self.get_transport('source')
1369
 
        t.ensure_base()
1370
 
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1371
 
        # The instrumented bzrdir has a test_branch attribute that logs calls
1372
 
        # made to the branch contained in that bzrdir.  Initially the test
1373
 
        # branch exists but no calls have been made to it.
1374
 
        self.assertEqual([], source_bzrdir.test_branch.calls)
1375
 
 
1376
 
        # Sprout the bzrdir
1377
 
        target_url = self.get_url('target')
1378
 
        result = source_bzrdir.sprout(target_url, recurse='no')
1379
 
 
1380
 
        # The bzrdir called the branch's sprout method.
1381
 
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1382
 
 
1383
 
    def test_sprout_parent(self):
1384
 
        grandparent_tree = self.make_branch('grandparent')
1385
 
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1386
 
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
1387
 
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1388
 
 
1389
 
 
1390
 
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1391
 
 
1392
 
    def test_pre_open_called(self):
1393
 
        calls = []
1394
 
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1395
 
        transport = self.get_transport('foo')
1396
 
        url = transport.base
1397
 
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1398
 
        self.assertEqual([transport.base], [t.base for t in calls])
1399
 
 
1400
 
    def test_pre_open_actual_exceptions_raised(self):
1401
 
        count = [0]
1402
 
        def fail_once(transport):
1403
 
            count[0] += 1
1404
 
            if count[0] == 1:
1405
 
                raise errors.BzrError("fail")
1406
 
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1407
 
        transport = self.get_transport('foo')
1408
 
        url = transport.base
1409
 
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1410
 
        self.assertEqual('fail', err._preformatted_string)
1411
 
 
1412
 
    def test_post_repo_init(self):
1413
 
        from bzrlib.bzrdir import RepoInitHookParams
1414
 
        calls = []
1415
 
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1416
 
            calls.append, None)
1417
 
        self.make_repository('foo')
1418
 
        self.assertLength(1, calls)
1419
 
        params = calls[0]
1420
 
        self.assertIsInstance(params, RepoInitHookParams)
1421
 
        self.assertTrue(hasattr(params, 'bzrdir'))
1422
 
        self.assertTrue(hasattr(params, 'repository'))
1423
 
 
1424
 
    def test_post_repo_init_hook_repr(self):
1425
 
        param_reprs = []
1426
 
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
1427
 
            lambda params: param_reprs.append(repr(params)), None)
1428
 
        self.make_repository('foo')
1429
 
        self.assertLength(1, param_reprs)
1430
 
        param_repr = param_reprs[0]
1431
 
        self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
1432
 
 
1433
 
 
1434
 
class TestGenerateBackupName(TestCaseWithMemoryTransport):
1435
 
    # FIXME: This may need to be unified with test_osutils.TestBackupNames or
1436
 
    # moved to per_bzrdir or per_transport for better coverage ?
1437
 
    # -- vila 20100909
1438
 
 
1439
 
    def setUp(self):
1440
 
        super(TestGenerateBackupName, self).setUp()
1441
 
        self._transport = get_transport(self.get_url())
1442
 
        bzrdir.BzrDir.create(self.get_url(),
1443
 
            possible_transports=[self._transport])
1444
 
        self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
1445
 
 
1446
 
    def test_deprecated_generate_backup_name(self):
1447
 
        res = self.applyDeprecated(
1448
 
                symbol_versioning.deprecated_in((2, 3, 0)),
1449
 
                self._bzrdir.generate_backup_name, 'whatever')
1450
 
 
1451
 
    def test_new(self):
1452
 
        self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
1453
 
 
1454
 
    def test_exiting(self):
1455
 
        self._transport.put_bytes("a.~1~", "some content")
1456
 
        self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))