~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2011-08-17 18:13:57 UTC
  • mfrom: (5268.7.29 transport-segments)
  • Revision ID: pqm@pqm.ubuntu.com-20110817181357-y5q5eth1hk8bl3om
(jelmer) Allow specifying the colocated branch to use in the branch URL,
 and retrieving the branch name using ControlDir._get_selected_branch.
 (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006-2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Tests for the BzrDir facility and any format specific tests.
18
18
 
19
 
For interface contract tests, see tests/bzr_dir_implementations.
 
19
For interface contract tests, see tests/per_bzr_dir.
20
20
"""
21
21
 
22
22
import os
23
 
import os.path
24
 
from StringIO import StringIO
25
23
import subprocess
26
24
import sys
27
25
 
28
26
from bzrlib import (
 
27
    branch,
29
28
    bzrdir,
 
29
    config,
 
30
    controldir,
30
31
    errors,
31
32
    help_topics,
 
33
    lock,
32
34
    repository,
 
35
    revision as _mod_revision,
33
36
    osutils,
 
37
    remote,
34
38
    symbol_versioning,
 
39
    transport as _mod_transport,
35
40
    urlutils,
36
41
    win32utils,
37
 
    workingtree,
 
42
    workingtree_3,
 
43
    workingtree_4,
38
44
    )
39
45
import bzrlib.branch
40
 
from bzrlib.errors import (NotBranchError,
41
 
                           UnknownFormatError,
42
 
                           UnsupportedFormatError,
43
 
                           )
 
46
from bzrlib.errors import (
 
47
    NotBranchError,
 
48
    NoColocatedBranchSupport,
 
49
    UnknownFormatError,
 
50
    UnsupportedFormatError,
 
51
    )
44
52
from bzrlib.tests import (
45
53
    TestCase,
46
54
    TestCaseWithMemoryTransport,
47
55
    TestCaseWithTransport,
48
56
    TestSkipped,
49
 
    test_sftp_transport
50
57
    )
51
 
from bzrlib.tests.http_server import HttpServer
52
 
from bzrlib.tests.http_utils import (
53
 
    TestCaseWithTwoWebservers,
54
 
    HTTPServerRedirecting,
 
58
from bzrlib.tests import(
 
59
    http_server,
 
60
    http_utils,
55
61
    )
56
62
from bzrlib.tests.test_http import TestWithTransport_pycurl
57
 
from bzrlib.transport import get_transport
 
63
from bzrlib.transport import (
 
64
    memory,
 
65
    pathfilter,
 
66
    )
58
67
from bzrlib.transport.http._urllib import HttpTransport_urllib
59
 
from bzrlib.transport.memory import MemoryServer
60
 
from bzrlib.repofmt import knitrepo, weaverepo
 
68
from bzrlib.transport.nosmart import NoSmartTransportDecorator
 
69
from bzrlib.transport.readonly import ReadonlyTransportDecorator
 
70
from bzrlib.repofmt import knitrepo, knitpack_repo
61
71
 
62
72
 
63
73
class TestDefaultFormat(TestCase):
64
74
 
65
75
    def test_get_set_default_format(self):
66
76
        old_format = bzrdir.BzrDirFormat.get_default_format()
67
 
        # default is BzrDirFormat6
68
 
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
69
 
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
77
        # default is BzrDirMetaFormat1
 
78
        self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
 
79
        controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
70
80
        # creating a bzr dir should now create an instrumented dir.
71
81
        try:
72
82
            result = bzrdir.BzrDir.create('memory:///')
73
 
            self.failUnless(isinstance(result, SampleBzrDir))
 
83
            self.assertIsInstance(result, SampleBzrDir)
74
84
        finally:
75
 
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
85
            controldir.ControlDirFormat._set_default_format(old_format)
76
86
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
77
87
 
78
88
 
 
89
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
 
90
    """A deprecated bzr dir format."""
 
91
 
 
92
 
79
93
class TestFormatRegistry(TestCase):
80
94
 
81
95
    def make_format_registry(self):
82
 
        my_format_registry = bzrdir.BzrDirFormatRegistry()
83
 
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
84
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
85
 
            ' repositories', deprecated=True)
86
 
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir', 
87
 
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
88
 
        my_format_registry.register_metadir('knit',
 
96
        my_format_registry = controldir.ControlDirFormatRegistry()
 
97
        my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
 
98
            'Some format.  Slower and unawesome and deprecated.',
 
99
            deprecated=True)
 
100
        my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
 
101
            'DeprecatedBzrDirFormat', 'Format registered lazily',
 
102
            deprecated=True)
 
103
        bzrdir.register_metadir(my_format_registry, 'knit',
89
104
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
90
105
            'Format using knits',
91
106
            )
92
107
        my_format_registry.set_default('knit')
93
 
        my_format_registry.register_metadir(
 
108
        bzrdir.register_metadir(my_format_registry,
94
109
            'branch6',
95
110
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
96
111
            'Experimental successor to knit.  Use at your own risk.',
97
112
            branch_format='bzrlib.branch.BzrBranchFormat6',
98
113
            experimental=True)
99
 
        my_format_registry.register_metadir(
 
114
        bzrdir.register_metadir(my_format_registry,
100
115
            'hidden format',
101
116
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
102
117
            'Experimental successor to knit.  Use at your own risk.',
103
118
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
104
 
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
105
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
106
 
            ' repositories', hidden=True)
107
 
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
108
 
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
109
 
            hidden=True)
 
119
        my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
 
120
            'Old format.  Slower and does not support things. ', hidden=True)
 
121
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
 
122
            'DeprecatedBzrDirFormat', 'Format registered lazily',
 
123
            deprecated=True, hidden=True)
110
124
        return my_format_registry
111
125
 
112
126
    def test_format_registry(self):
113
127
        my_format_registry = self.make_format_registry()
114
128
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
115
 
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
116
 
        my_bzrdir = my_format_registry.make_bzrdir('weave')
117
 
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
129
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
 
130
        my_bzrdir = my_format_registry.make_bzrdir('deprecated')
 
131
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
118
132
        my_bzrdir = my_format_registry.make_bzrdir('default')
119
 
        self.assertIsInstance(my_bzrdir.repository_format, 
 
133
        self.assertIsInstance(my_bzrdir.repository_format,
120
134
            knitrepo.RepositoryFormatKnit1)
121
135
        my_bzrdir = my_format_registry.make_bzrdir('knit')
122
 
        self.assertIsInstance(my_bzrdir.repository_format, 
 
136
        self.assertIsInstance(my_bzrdir.repository_format,
123
137
            knitrepo.RepositoryFormatKnit1)
124
138
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
125
139
        self.assertIsInstance(my_bzrdir.get_branch_format(),
129
143
        my_format_registry = self.make_format_registry()
130
144
        self.assertEqual('Format registered lazily',
131
145
                         my_format_registry.get_help('lazy'))
132
 
        self.assertEqual('Format using knits', 
 
146
        self.assertEqual('Format using knits',
133
147
                         my_format_registry.get_help('knit'))
134
 
        self.assertEqual('Format using knits', 
 
148
        self.assertEqual('Format using knits',
135
149
                         my_format_registry.get_help('default'))
136
 
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
137
 
                         ' checkouts or shared repositories', 
138
 
                         my_format_registry.get_help('weave'))
139
 
        
 
150
        self.assertEqual('Some format.  Slower and unawesome and deprecated.',
 
151
                         my_format_registry.get_help('deprecated'))
 
152
 
140
153
    def test_help_topic(self):
141
154
        topics = help_topics.HelpTopicRegistry()
142
 
        topics.register('formats', self.make_format_registry().help_topic, 
143
 
                        'Directory formats')
144
 
        topic = topics.get_detail('formats')
145
 
        new, rest = topic.split('Experimental formats')
 
155
        registry = self.make_format_registry()
 
156
        topics.register('current-formats', registry.help_topic,
 
157
                        'Current formats')
 
158
        topics.register('other-formats', registry.help_topic,
 
159
                        'Other formats')
 
160
        new = topics.get_detail('current-formats')
 
161
        rest = topics.get_detail('other-formats')
146
162
        experimental, deprecated = rest.split('Deprecated formats')
147
 
        self.assertContainsRe(new, 'These formats can be used')
148
 
        self.assertContainsRe(new, 
 
163
        self.assertContainsRe(new, 'formats-help')
 
164
        self.assertContainsRe(new,
149
165
                ':knit:\n    \(native\) \(default\) Format using knits\n')
150
 
        self.assertContainsRe(experimental, 
 
166
        self.assertContainsRe(experimental,
151
167
                ':branch6:\n    \(native\) Experimental successor to knit')
152
 
        self.assertContainsRe(deprecated, 
 
168
        self.assertContainsRe(deprecated,
153
169
                ':lazy:\n    \(native\) Format registered lazily\n')
154
170
        self.assertNotContainsRe(new, 'hidden')
155
171
 
162
178
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
163
179
                          bzrdir.format_registry.get('default'))
164
180
            self.assertIs(
165
 
                repository.RepositoryFormat.get_default_format().__class__,
 
181
                repository.format_registry.get_default().__class__,
166
182
                knitrepo.RepositoryFormatKnit3)
167
183
        finally:
168
184
            bzrdir.format_registry.set_default_repository(old_default)
169
185
 
170
186
    def test_aliases(self):
171
 
        a_registry = bzrdir.BzrDirFormatRegistry()
172
 
        a_registry.register('weave', bzrdir.BzrDirFormat6,
173
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
174
 
            ' repositories', deprecated=True)
175
 
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
176
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
177
 
            ' repositories', deprecated=True, alias=True)
178
 
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
179
 
    
 
187
        a_registry = controldir.ControlDirFormatRegistry()
 
188
        a_registry.register('deprecated', DeprecatedBzrDirFormat,
 
189
            'Old format.  Slower and does not support stuff',
 
190
            deprecated=True)
 
191
        a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
 
192
            'Old format.  Slower and does not support stuff',
 
193
            deprecated=True, alias=True)
 
194
        self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
 
195
 
180
196
 
181
197
class SampleBranch(bzrlib.branch.Branch):
182
198
    """A dummy branch for guess what, dummy use."""
185
201
        self.bzrdir = dir
186
202
 
187
203
 
 
204
class SampleRepository(bzrlib.repository.Repository):
 
205
    """A dummy repo."""
 
206
 
 
207
    def __init__(self, dir):
 
208
        self.bzrdir = dir
 
209
 
 
210
 
188
211
class SampleBzrDir(bzrdir.BzrDir):
189
212
    """A sample BzrDir implementation to allow testing static methods."""
190
213
 
194
217
 
195
218
    def open_repository(self):
196
219
        """See BzrDir.open_repository."""
197
 
        return "A repository"
 
220
        return SampleRepository(self)
198
221
 
199
 
    def create_branch(self):
 
222
    def create_branch(self, name=None):
200
223
        """See BzrDir.create_branch."""
 
224
        if name is not None:
 
225
            raise NoColocatedBranchSupport(self)
201
226
        return SampleBranch(self)
202
227
 
203
228
    def create_workingtree(self):
208
233
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
209
234
    """A sample format
210
235
 
211
 
    this format is initializable, unsupported to aid in testing the 
 
236
    this format is initializable, unsupported to aid in testing the
212
237
    open and open_downlevel routines.
213
238
    """
214
239
 
229
254
        return "opened branch."
230
255
 
231
256
 
 
257
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
 
258
 
 
259
    @staticmethod
 
260
    def get_format_string():
 
261
        return "Test format 1"
 
262
 
 
263
 
 
264
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
 
265
 
 
266
    @staticmethod
 
267
    def get_format_string():
 
268
        return "Test format 2"
 
269
 
 
270
 
232
271
class TestBzrDirFormat(TestCaseWithTransport):
233
272
    """Tests for the BzrDirFormat facility."""
234
273
 
235
274
    def test_find_format(self):
236
275
        # is the right format object found for a branch?
237
276
        # create a branch with a few known format objects.
238
 
        # this is not quite the same as 
239
 
        t = get_transport(self.get_url())
 
277
        bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
 
278
            BzrDirFormatTest1())
 
279
        self.addCleanup(bzrdir.BzrProber.formats.remove,
 
280
            BzrDirFormatTest1.get_format_string())
 
281
        bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
 
282
            BzrDirFormatTest2())
 
283
        self.addCleanup(bzrdir.BzrProber.formats.remove,
 
284
            BzrDirFormatTest2.get_format_string())
 
285
        t = self.get_transport()
240
286
        self.build_tree(["foo/", "bar/"], transport=t)
241
287
        def check_format(format, url):
242
288
            format.initialize(url)
243
 
            t = get_transport(url)
 
289
            t = _mod_transport.get_transport(url)
244
290
            found_format = bzrdir.BzrDirFormat.find_format(t)
245
 
            self.failUnless(isinstance(found_format, format.__class__))
246
 
        check_format(bzrdir.BzrDirFormat5(), "foo")
247
 
        check_format(bzrdir.BzrDirFormat6(), "bar")
248
 
        
 
291
            self.assertIsInstance(found_format, format.__class__)
 
292
        check_format(BzrDirFormatTest1(), "foo")
 
293
        check_format(BzrDirFormatTest2(), "bar")
 
294
 
249
295
    def test_find_format_nothing_there(self):
250
296
        self.assertRaises(NotBranchError,
251
297
                          bzrdir.BzrDirFormat.find_format,
252
 
                          get_transport('.'))
 
298
                          _mod_transport.get_transport('.'))
253
299
 
254
300
    def test_find_format_unknown_format(self):
255
 
        t = get_transport(self.get_url())
 
301
        t = self.get_transport()
256
302
        t.mkdir('.bzr')
257
303
        t.put_bytes('.bzr/branch-format', '')
258
304
        self.assertRaises(UnknownFormatError,
259
305
                          bzrdir.BzrDirFormat.find_format,
260
 
                          get_transport('.'))
 
306
                          _mod_transport.get_transport('.'))
261
307
 
262
308
    def test_register_unregister_format(self):
263
309
        format = SampleBzrDirFormat()
265
311
        # make a bzrdir
266
312
        format.initialize(url)
267
313
        # register a format for it.
268
 
        bzrdir.BzrDirFormat.register_format(format)
 
314
        bzrdir.BzrProber.formats.register(format.get_format_string(), format)
269
315
        # which bzrdir.Open will refuse (not supported)
270
316
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
271
317
        # which bzrdir.open_containing will refuse (not supported)
272
318
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
273
319
        # but open_downlevel will work
274
 
        t = get_transport(url)
 
320
        t = _mod_transport.get_transport(url)
275
321
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
276
322
        # unregister the format
277
 
        bzrdir.BzrDirFormat.unregister_format(format)
 
323
        bzrdir.BzrProber.formats.remove(format.get_format_string())
278
324
        # now open_downlevel should fail too.
279
325
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
280
326
 
295
341
                          branch.bzrdir.open_repository)
296
342
 
297
343
    def test_create_branch_and_repo_under_shared_force_new(self):
298
 
        # creating a branch and repo in a shared repo can be forced to 
 
344
        # creating a branch and repo in a shared repo can be forced to
299
345
        # make a new repo
300
346
        format = bzrdir.format_registry.make_bzrdir('knit')
301
347
        self.make_repository('.', shared=True, format=format)
306
352
 
307
353
    def test_create_standalone_working_tree(self):
308
354
        format = SampleBzrDirFormat()
309
 
        # note this is deliberately readonly, as this failure should 
 
355
        # note this is deliberately readonly, as this failure should
310
356
        # occur before any writes.
311
357
        self.assertRaises(errors.NotLocalUrl,
312
358
                          bzrdir.BzrDir.create_standalone_workingtree,
313
359
                          self.get_readonly_url(), format=format)
314
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('.', 
 
360
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
315
361
                                                           format=format)
316
362
        self.assertEqual('A tree', tree)
317
363
 
319
365
        # create standalone working tree always makes a repo.
320
366
        format = bzrdir.format_registry.make_bzrdir('knit')
321
367
        self.make_repository('.', shared=True, format=format)
322
 
        # note this is deliberately readonly, as this failure should 
 
368
        # note this is deliberately readonly, as this failure should
323
369
        # occur before any writes.
324
370
        self.assertRaises(errors.NotLocalUrl,
325
371
                          bzrdir.BzrDir.create_standalone_workingtree,
326
372
                          self.get_readonly_url('child'), format=format)
327
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('child', 
 
373
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
328
374
            format=format)
329
375
        tree.bzrdir.open_repository()
330
376
 
346
392
 
347
393
    def test_create_branch_convenience_root(self):
348
394
        """Creating a branch at the root of a fs should work."""
349
 
        self.vfs_transport_factory = MemoryServer
 
395
        self.vfs_transport_factory = memory.MemoryServer
350
396
        # outside a repo the default convenience output is a repo+branch_tree
351
397
        format = bzrdir.format_registry.make_bzrdir('knit')
352
 
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(), 
 
398
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
353
399
                                                         format=format)
354
400
        self.assertRaises(errors.NoWorkingTree,
355
401
                          branch.bzrdir.open_workingtree)
365
411
        branch.bzrdir.open_workingtree()
366
412
        self.assertRaises(errors.NoRepositoryPresent,
367
413
                          branch.bzrdir.open_repository)
368
 
            
 
414
 
369
415
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
370
416
        # inside a repo the default convenience output is a branch+ follow the
371
417
        # repo tree policy but we can override that
377
423
                          branch.bzrdir.open_workingtree)
378
424
        self.assertRaises(errors.NoRepositoryPresent,
379
425
                          branch.bzrdir.open_repository)
380
 
            
 
426
 
381
427
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
382
428
        # inside a repo the default convenience output is a branch+ follow the
383
429
        # repo tree policy
384
430
        format = bzrdir.format_registry.make_bzrdir('knit')
385
431
        repo = self.make_repository('.', shared=True, format=format)
386
432
        repo.set_make_working_trees(False)
387
 
        branch = bzrdir.BzrDir.create_branch_convenience('child', 
 
433
        branch = bzrdir.BzrDir.create_branch_convenience('child',
388
434
                                                         format=format)
389
435
        self.assertRaises(errors.NoWorkingTree,
390
436
                          branch.bzrdir.open_workingtree)
420
466
        """The default acquisition policy should create a standalone branch."""
421
467
        my_bzrdir = self.make_bzrdir('.')
422
468
        repo_policy = my_bzrdir.determine_repository_policy()
423
 
        repo = repo_policy.acquire_repository()
 
469
        repo, is_new = repo_policy.acquire_repository()
424
470
        self.assertEqual(repo.bzrdir.root_transport.base,
425
471
                         my_bzrdir.root_transport.base)
426
472
        self.assertFalse(repo.is_shared())
427
473
 
428
 
 
429
474
    def test_determine_stacking_policy(self):
430
475
        parent_bzrdir = self.make_bzrdir('.')
431
476
        child_bzrdir = self.make_bzrdir('child')
442
487
        self.assertEqual(parent_bzrdir.root_transport.base,
443
488
                         repo_policy._stack_on_pwd)
444
489
 
445
 
    def prepare_default_stacking(self, child_format='development1'):
 
490
    def prepare_default_stacking(self, child_format='1.6'):
446
491
        parent_bzrdir = self.make_bzrdir('.')
447
492
        child_branch = self.make_branch('child', format=child_format)
448
493
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
455
500
        self.assertEqual(child_branch.base,
456
501
                         new_child.open_branch().get_stacked_on_url())
457
502
 
 
503
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
 
504
        # Make stackable source branch with an unstackable repo format.
 
505
        source_bzrdir = self.make_bzrdir('source')
 
506
        knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
 
507
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
 
508
            source_bzrdir)
 
509
        # Make a directory with a default stacking policy
 
510
        parent_bzrdir = self.make_bzrdir('parent')
 
511
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
 
512
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
 
513
        # Clone source into directory
 
514
        target = source_bzrdir.clone(self.get_url('parent/target'))
 
515
 
458
516
    def test_sprout_obeys_stacking_policy(self):
459
517
        child_branch, new_child_transport = self.prepare_default_stacking()
460
518
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
507
565
        self.assertTrue(repo.supports_rich_root())
508
566
 
509
567
    def test_add_fallback_repo_handles_absolute_urls(self):
510
 
        stack_on = self.make_branch('stack_on', format='development1')
511
 
        repo = self.make_repository('repo', format='development1')
 
568
        stack_on = self.make_branch('stack_on', format='1.6')
 
569
        repo = self.make_repository('repo', format='1.6')
512
570
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
513
571
        policy._add_fallback(repo)
514
572
 
515
573
    def test_add_fallback_repo_handles_relative_urls(self):
516
 
        stack_on = self.make_branch('stack_on', format='development1')
517
 
        repo = self.make_repository('repo', format='development1')
 
574
        stack_on = self.make_branch('stack_on', format='1.6')
 
575
        repo = self.make_repository('repo', format='1.6')
518
576
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
519
577
        policy._add_fallback(repo)
520
578
 
521
579
    def test_configure_relative_branch_stacking_url(self):
522
 
        stack_on = self.make_branch('stack_on', format='development1')
523
 
        stacked = self.make_branch('stack_on/stacked', format='development1')
 
580
        stack_on = self.make_branch('stack_on', format='1.6')
 
581
        stacked = self.make_branch('stack_on/stacked', format='1.6')
524
582
        policy = bzrdir.UseExistingRepository(stacked.repository,
525
583
            '.', stack_on.base)
526
584
        policy.configure_branch(stacked)
527
585
        self.assertEqual('..', stacked.get_stacked_on_url())
528
586
 
529
587
    def test_relative_branch_stacking_to_absolute(self):
530
 
        stack_on = self.make_branch('stack_on', format='development1')
531
 
        stacked = self.make_branch('stack_on/stacked', format='development1')
 
588
        stack_on = self.make_branch('stack_on', format='1.6')
 
589
        stacked = self.make_branch('stack_on/stacked', format='1.6')
532
590
        policy = bzrdir.UseExistingRepository(stacked.repository,
533
591
            '.', self.get_readonly_url('stack_on'))
534
592
        policy.configure_branch(stacked)
546
604
 
547
605
    def setUp(self):
548
606
        super(ChrootedTests, self).setUp()
549
 
        if not self.vfs_transport_factory == MemoryServer:
550
 
            self.transport_readonly_server = HttpServer
 
607
        if not self.vfs_transport_factory == memory.MemoryServer:
 
608
            self.transport_readonly_server = http_server.HttpServer
551
609
 
552
610
    def local_branch_path(self, branch):
553
611
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
651
709
        self.assertEqual(relpath, 'baz')
652
710
 
653
711
    def test_open_containing_from_transport(self):
654
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
655
 
                          get_transport(self.get_readonly_url('')))
656
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
657
 
                          get_transport(self.get_readonly_url('g/p/q')))
 
712
        self.assertRaises(NotBranchError,
 
713
            bzrdir.BzrDir.open_containing_from_transport,
 
714
            _mod_transport.get_transport(self.get_readonly_url('')))
 
715
        self.assertRaises(NotBranchError,
 
716
            bzrdir.BzrDir.open_containing_from_transport,
 
717
            _mod_transport.get_transport(self.get_readonly_url('g/p/q')))
658
718
        control = bzrdir.BzrDir.create(self.get_url())
659
719
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
660
 
            get_transport(self.get_readonly_url('')))
 
720
            _mod_transport.get_transport(self.get_readonly_url('')))
661
721
        self.assertEqual('', relpath)
662
722
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
663
 
            get_transport(self.get_readonly_url('g/p/q')))
 
723
            _mod_transport.get_transport(self.get_readonly_url('g/p/q')))
664
724
        self.assertEqual('g/p/q', relpath)
665
725
 
666
726
    def test_open_containing_tree_or_branch(self):
710
770
        # transport pointing at bzrdir should give a bzrdir with root transport
711
771
        # set to the given transport
712
772
        control = bzrdir.BzrDir.create(self.get_url())
713
 
        transport = get_transport(self.get_url())
714
 
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
715
 
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
 
773
        t = self.get_transport()
 
774
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
 
775
        self.assertEqual(t.base, opened_bzrdir.root_transport.base)
716
776
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
717
 
        
 
777
 
718
778
    def test_open_from_transport_no_bzrdir(self):
719
 
        transport = get_transport(self.get_url())
720
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
721
 
                          transport)
 
779
        t = self.get_transport()
 
780
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
722
781
 
723
782
    def test_open_from_transport_bzrdir_in_parent(self):
724
783
        control = bzrdir.BzrDir.create(self.get_url())
725
 
        transport = get_transport(self.get_url())
726
 
        transport.mkdir('subdir')
727
 
        transport = transport.clone('subdir')
728
 
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
729
 
                          transport)
 
784
        t = self.get_transport()
 
785
        t.mkdir('subdir')
 
786
        t = t.clone('subdir')
 
787
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
730
788
 
731
789
    def test_sprout_recursive(self):
732
 
        tree = self.make_branch_and_tree('tree1', format='dirstate-with-subtree')
 
790
        tree = self.make_branch_and_tree('tree1',
 
791
                                         format='dirstate-with-subtree')
733
792
        sub_tree = self.make_branch_and_tree('tree1/subtree',
734
793
            format='dirstate-with-subtree')
 
794
        sub_tree.set_root_id('subtree-root')
735
795
        tree.add_reference(sub_tree)
736
796
        self.build_tree(['tree1/subtree/file'])
737
797
        sub_tree.add('file')
738
798
        tree.commit('Initial commit')
739
 
        tree.bzrdir.sprout('tree2')
740
 
        self.failUnlessExists('tree2/subtree/file')
 
799
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
 
800
        tree2.lock_read()
 
801
        self.addCleanup(tree2.unlock)
 
802
        self.assertPathExists('tree2/subtree/file')
 
803
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
741
804
 
742
805
    def test_cloning_metadir(self):
743
806
        """Ensure that cloning metadir is suitable"""
746
809
        branch = self.make_branch('branch', format='knit')
747
810
        format = branch.bzrdir.cloning_metadir()
748
811
        self.assertIsInstance(format.workingtree_format,
749
 
            workingtree.WorkingTreeFormat3)
 
812
            workingtree_4.WorkingTreeFormat6)
750
813
 
751
814
    def test_sprout_recursive_treeless(self):
752
815
        tree = self.make_branch_and_tree('tree1',
757
820
        self.build_tree(['tree1/subtree/file'])
758
821
        sub_tree.add('file')
759
822
        tree.commit('Initial commit')
 
823
        # The following line force the orhaning to reveal bug #634470
 
824
        tree.branch.get_config().set_user_option(
 
825
            'bzr.transform.orphan_policy', 'move')
760
826
        tree.bzrdir.destroy_workingtree()
 
827
        # FIXME: subtree/.bzr is left here which allows the test to pass (or
 
828
        # fail :-( ) -- vila 20100909
761
829
        repo = self.make_repository('repo', shared=True,
762
830
            format='dirstate-with-subtree')
763
831
        repo.set_make_working_trees(False)
764
 
        tree.bzrdir.sprout('repo/tree2')
765
 
        self.failUnlessExists('repo/tree2/subtree')
766
 
        self.failIfExists('repo/tree2/subtree/file')
 
832
        # FIXME: we just deleted the workingtree and now we want to use it ????
 
833
        # At a minimum, we should use tree.branch below (but this fails too
 
834
        # currently) or stop calling this test 'treeless'. Specifically, I've
 
835
        # turn the line below into an assertRaises when 'subtree/.bzr' is
 
836
        # orphaned and sprout tries to access the branch there (which is left
 
837
        # by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
 
838
        # [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
 
839
        # #634470.  -- vila 20100909
 
840
        self.assertRaises(errors.NotBranchError,
 
841
                          tree.bzrdir.sprout, 'repo/tree2')
 
842
#        self.assertPathExists('repo/tree2/subtree')
 
843
#        self.assertPathDoesNotExist('repo/tree2/subtree/file')
767
844
 
768
845
    def make_foo_bar_baz(self):
769
846
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
773
850
 
774
851
    def test_find_bzrdirs(self):
775
852
        foo, bar, baz = self.make_foo_bar_baz()
776
 
        transport = get_transport(self.get_url())
777
 
        self.assertEqualBzrdirs([baz, foo, bar],
778
 
                                bzrdir.BzrDir.find_bzrdirs(transport))
 
853
        t = self.get_transport()
 
854
        self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
 
855
 
 
856
    def make_fake_permission_denied_transport(self, transport, paths):
 
857
        """Create a transport that raises PermissionDenied for some paths."""
 
858
        def filter(path):
 
859
            if path in paths:
 
860
                raise errors.PermissionDenied(path)
 
861
            return path
 
862
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
 
863
        path_filter_server.start_server()
 
864
        self.addCleanup(path_filter_server.stop_server)
 
865
        path_filter_transport = pathfilter.PathFilteringTransport(
 
866
            path_filter_server, '.')
 
867
        return (path_filter_server, path_filter_transport)
 
868
 
 
869
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
 
870
        """Check that each branch url ends with the given suffix."""
 
871
        for actual_bzrdir in actual_bzrdirs:
 
872
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
 
873
 
 
874
    def test_find_bzrdirs_permission_denied(self):
 
875
        foo, bar, baz = self.make_foo_bar_baz()
 
876
        t = self.get_transport()
 
877
        path_filter_server, path_filter_transport = \
 
878
            self.make_fake_permission_denied_transport(t, ['foo'])
 
879
        # local transport
 
880
        self.assertBranchUrlsEndWith('/baz/',
 
881
            bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
 
882
        # smart server
 
883
        smart_transport = self.make_smart_server('.',
 
884
            backing_server=path_filter_server)
 
885
        self.assertBranchUrlsEndWith('/baz/',
 
886
            bzrdir.BzrDir.find_bzrdirs(smart_transport))
779
887
 
780
888
    def test_find_bzrdirs_list_current(self):
781
889
        def list_current(transport):
782
890
            return [s for s in transport.list_dir('') if s != 'baz']
783
891
 
784
892
        foo, bar, baz = self.make_foo_bar_baz()
785
 
        transport = get_transport(self.get_url())
786
 
        self.assertEqualBzrdirs([foo, bar],
787
 
                                bzrdir.BzrDir.find_bzrdirs(transport,
788
 
                                    list_current=list_current))
789
 
 
 
893
        t = self.get_transport()
 
894
        self.assertEqualBzrdirs(
 
895
            [foo, bar],
 
896
            bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
790
897
 
791
898
    def test_find_bzrdirs_evaluate(self):
792
899
        def evaluate(bzrdir):
798
905
                return False, bzrdir.root_transport.base
799
906
 
800
907
        foo, bar, baz = self.make_foo_bar_baz()
801
 
        transport = get_transport(self.get_url())
 
908
        t = self.get_transport()
802
909
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
803
 
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
804
 
                                                         evaluate=evaluate)))
 
910
                         list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
805
911
 
806
912
    def assertEqualBzrdirs(self, first, second):
807
913
        first = list(first)
814
920
        root = self.make_repository('', shared=True)
815
921
        foo, bar, baz = self.make_foo_bar_baz()
816
922
        qux = self.make_bzrdir('foo/qux')
817
 
        transport = get_transport(self.get_url())
818
 
        branches = bzrdir.BzrDir.find_branches(transport)
 
923
        t = self.get_transport()
 
924
        branches = bzrdir.BzrDir.find_branches(t)
819
925
        self.assertEqual(baz.root_transport.base, branches[0].base)
820
926
        self.assertEqual(foo.root_transport.base, branches[1].base)
821
927
        self.assertEqual(bar.root_transport.base, branches[2].base)
822
928
 
823
929
        # ensure this works without a top-level repo
824
 
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
 
930
        branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
825
931
        self.assertEqual(foo.root_transport.base, branches[0].base)
826
932
        self.assertEqual(bar.root_transport.base, branches[1].base)
827
933
 
828
934
 
 
935
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
 
936
 
 
937
    def test_find_bzrdirs_missing_repo(self):
 
938
        t = self.get_transport()
 
939
        arepo = self.make_repository('arepo', shared=True)
 
940
        abranch_url = arepo.user_url + '/abranch'
 
941
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
 
942
        t.delete_tree('arepo/.bzr')
 
943
        self.assertRaises(errors.NoRepositoryPresent,
 
944
            branch.Branch.open, abranch_url)
 
945
        self.make_branch('baz')
 
946
        for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
 
947
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
 
948
 
 
949
 
829
950
class TestMeta1DirFormat(TestCaseWithTransport):
830
951
    """Tests specific to the meta1 dir format."""
831
952
 
838
959
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
839
960
        repository_base = t.clone('repository').base
840
961
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
962
        repository_format = repository.format_registry.get_default()
841
963
        self.assertEqual(repository_base,
842
 
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
964
                         dir.get_repository_transport(repository_format).base)
843
965
        checkout_base = t.clone('checkout').base
844
966
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
845
967
        self.assertEqual(checkout_base,
846
 
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
 
968
                         dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
847
969
 
848
970
    def test_meta1dir_uses_lockdir(self):
849
971
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
869
991
 
870
992
    def test_needs_conversion_different_working_tree(self):
871
993
        # meta1dirs need an conversion if any element is not the default.
872
 
        old_format = bzrdir.BzrDirFormat.get_default_format()
873
 
        # test with 
874
 
        new_default = bzrdir.format_registry.make_bzrdir('dirstate')
875
 
        bzrdir.BzrDirFormat._set_default_format(new_default)
876
 
        try:
877
 
            tree = self.make_branch_and_tree('tree', format='knit')
878
 
            self.assertTrue(tree.bzrdir.needs_format_conversion())
879
 
        finally:
880
 
            bzrdir.BzrDirFormat._set_default_format(old_format)
881
 
 
882
 
 
883
 
class TestFormat5(TestCaseWithTransport):
884
 
    """Tests specific to the version 5 bzrdir format."""
885
 
 
886
 
    def test_same_lockfiles_between_tree_repo_branch(self):
887
 
        # this checks that only a single lockfiles instance is created 
888
 
        # for format 5 objects
889
 
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
890
 
        def check_dir_components_use_same_lock(dir):
891
 
            ctrl_1 = dir.open_repository().control_files
892
 
            ctrl_2 = dir.open_branch().control_files
893
 
            ctrl_3 = dir.open_workingtree()._control_files
894
 
            self.assertTrue(ctrl_1 is ctrl_2)
895
 
            self.assertTrue(ctrl_2 is ctrl_3)
896
 
        check_dir_components_use_same_lock(dir)
897
 
        # and if we open it normally.
898
 
        dir = bzrdir.BzrDir.open(self.get_url())
899
 
        check_dir_components_use_same_lock(dir)
900
 
    
901
 
    def test_can_convert(self):
902
 
        # format 5 dirs are convertable
903
 
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
904
 
        self.assertTrue(dir.can_convert_format())
905
 
    
906
 
    def test_needs_conversion(self):
907
 
        # format 5 dirs need a conversion if they are not the default.
908
 
        # and they start of not the default.
909
 
        old_format = bzrdir.BzrDirFormat.get_default_format()
910
 
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
911
 
        try:
912
 
            dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
913
 
            self.assertFalse(dir.needs_format_conversion())
914
 
        finally:
915
 
            bzrdir.BzrDirFormat._set_default_format(old_format)
916
 
        self.assertTrue(dir.needs_format_conversion())
917
 
 
918
 
 
919
 
class TestFormat6(TestCaseWithTransport):
920
 
    """Tests specific to the version 6 bzrdir format."""
921
 
 
922
 
    def test_same_lockfiles_between_tree_repo_branch(self):
923
 
        # this checks that only a single lockfiles instance is created 
924
 
        # for format 6 objects
925
 
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
926
 
        def check_dir_components_use_same_lock(dir):
927
 
            ctrl_1 = dir.open_repository().control_files
928
 
            ctrl_2 = dir.open_branch().control_files
929
 
            ctrl_3 = dir.open_workingtree()._control_files
930
 
            self.assertTrue(ctrl_1 is ctrl_2)
931
 
            self.assertTrue(ctrl_2 is ctrl_3)
932
 
        check_dir_components_use_same_lock(dir)
933
 
        # and if we open it normally.
934
 
        dir = bzrdir.BzrDir.open(self.get_url())
935
 
        check_dir_components_use_same_lock(dir)
936
 
    
937
 
    def test_can_convert(self):
938
 
        # format 6 dirs are convertable
939
 
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
940
 
        self.assertTrue(dir.can_convert_format())
941
 
    
942
 
    def test_needs_conversion(self):
943
 
        # format 6 dirs need an conversion if they are not the default.
944
 
        old_format = bzrdir.BzrDirFormat.get_default_format()
945
 
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirMetaFormat1())
946
 
        try:
947
 
            dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
948
 
            self.assertTrue(dir.needs_format_conversion())
949
 
        finally:
950
 
            bzrdir.BzrDirFormat._set_default_format(old_format)
951
 
 
952
 
 
953
 
class NotBzrDir(bzrlib.bzrdir.BzrDir):
954
 
    """A non .bzr based control directory."""
955
 
 
956
 
    def __init__(self, transport, format):
957
 
        self._format = format
958
 
        self.root_transport = transport
959
 
        self.transport = transport.clone('.not')
960
 
 
961
 
 
962
 
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
963
 
    """A test class representing any non-.bzr based disk format."""
964
 
 
965
 
    def initialize_on_transport(self, transport):
966
 
        """Initialize a new .not dir in the base directory of a Transport."""
967
 
        transport.mkdir('.not')
968
 
        return self.open(transport)
969
 
 
970
 
    def open(self, transport):
971
 
        """Open this directory."""
972
 
        return NotBzrDir(transport, self)
973
 
 
974
 
    @classmethod
975
 
    def _known_formats(self):
976
 
        return set([NotBzrDirFormat()])
977
 
 
978
 
    @classmethod
979
 
    def probe_transport(self, transport):
980
 
        """Our format is present if the transport ends in '.not/'."""
981
 
        if transport.has('.not'):
982
 
            return NotBzrDirFormat()
983
 
 
984
 
 
985
 
class TestNotBzrDir(TestCaseWithTransport):
986
 
    """Tests for using the bzrdir api with a non .bzr based disk format.
987
 
    
988
 
    If/when one of these is in the core, we can let the implementation tests
989
 
    verify this works.
990
 
    """
991
 
 
992
 
    def test_create_and_find_format(self):
993
 
        # create a .notbzr dir 
994
 
        format = NotBzrDirFormat()
995
 
        dir = format.initialize(self.get_url())
996
 
        self.assertIsInstance(dir, NotBzrDir)
997
 
        # now probe for it.
998
 
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
999
 
        try:
1000
 
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
1001
 
                get_transport(self.get_url()))
1002
 
            self.assertIsInstance(found, NotBzrDirFormat)
1003
 
        finally:
1004
 
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
1005
 
 
1006
 
    def test_included_in_known_formats(self):
1007
 
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
1008
 
        try:
1009
 
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
1010
 
            for format in formats:
1011
 
                if isinstance(format, NotBzrDirFormat):
1012
 
                    return
1013
 
            self.fail("No NotBzrDirFormat in %s" % formats)
1014
 
        finally:
1015
 
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
994
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
995
        tree = self.make_branch_and_tree('tree', format='knit')
 
996
        self.assertTrue(tree.bzrdir.needs_format_conversion(
 
997
            new_format))
 
998
 
 
999
    def test_initialize_on_format_uses_smart_transport(self):
 
1000
        self.setup_smart_server_with_call_log()
 
1001
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
1002
        transport = self.get_transport('target')
 
1003
        transport.ensure_base()
 
1004
        self.reset_smart_call_log()
 
1005
        instance = new_format.initialize_on_transport(transport)
 
1006
        self.assertIsInstance(instance, remote.RemoteBzrDir)
 
1007
        rpc_count = len(self.hpss_calls)
 
1008
        # This figure represent the amount of work to perform this use case. It
 
1009
        # is entirely ok to reduce this number if a test fails due to rpc_count
 
1010
        # being too low. If rpc_count increases, more network roundtrips have
 
1011
        # become necessary for this use case. Please do not adjust this number
 
1012
        # upwards without agreement from bzr's network support maintainers.
 
1013
        self.assertEqual(2, rpc_count)
1016
1014
 
1017
1015
 
1018
1016
class NonLocalTests(TestCaseWithTransport):
1020
1018
 
1021
1019
    def setUp(self):
1022
1020
        super(NonLocalTests, self).setUp()
1023
 
        self.vfs_transport_factory = MemoryServer
1024
 
    
 
1021
        self.vfs_transport_factory = memory.MemoryServer
 
1022
 
1025
1023
    def test_create_branch_convenience(self):
1026
1024
        # outside a repo the default convenience output is a repo+branch_tree
1027
1025
        format = bzrdir.format_registry.make_bzrdir('knit')
1039
1037
            self.get_url('foo'),
1040
1038
            force_new_tree=True,
1041
1039
            format=format)
1042
 
        t = get_transport(self.get_url('.'))
 
1040
        t = self.get_transport()
1043
1041
        self.assertFalse(t.has('foo'))
1044
1042
 
1045
1043
    def test_clone(self):
1061
1059
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
1062
1060
        checkout_format = my_bzrdir.checkout_metadir()
1063
1061
        self.assertIsInstance(checkout_format.workingtree_format,
1064
 
                              workingtree.WorkingTreeFormat3)
1065
 
 
1066
 
 
1067
 
class TestHTTPRedirectionLoop(object):
1068
 
    """Test redirection loop between two http servers.
 
1062
                              workingtree_4.WorkingTreeFormat4)
 
1063
 
 
1064
 
 
1065
class TestHTTPRedirections(object):
 
1066
    """Test redirection between two http servers.
1069
1067
 
1070
1068
    This MUST be used by daughter classes that also inherit from
1071
1069
    TestCaseWithTwoWebservers.
1072
1070
 
1073
1071
    We can't inherit directly from TestCaseWithTwoWebservers or the
1074
1072
    test framework will try to create an instance which cannot
1075
 
    run, its implementation being incomplete. 
 
1073
    run, its implementation being incomplete.
1076
1074
    """
1077
1075
 
1078
 
    # Should be defined by daughter classes to ensure redirection
1079
 
    # still use the same transport implementation (not currently
1080
 
    # enforced as it's a bit tricky to get right (see the FIXME
1081
 
    # in BzrDir.open_from_transport for the unique use case so
1082
 
    # far)
1083
 
    _qualifier = None
1084
 
 
1085
1076
    def create_transport_readonly_server(self):
1086
 
        return HTTPServerRedirecting()
 
1077
        # We don't set the http protocol version, relying on the default
 
1078
        return http_utils.HTTPServerRedirecting()
1087
1079
 
1088
1080
    def create_transport_secondary_server(self):
1089
 
        return HTTPServerRedirecting()
 
1081
        # We don't set the http protocol version, relying on the default
 
1082
        return http_utils.HTTPServerRedirecting()
1090
1083
 
1091
1084
    def setUp(self):
1092
 
        # Both servers redirect to each server creating a loop
1093
 
        super(TestHTTPRedirectionLoop, self).setUp()
 
1085
        super(TestHTTPRedirections, self).setUp()
1094
1086
        # The redirections will point to the new server
1095
1087
        self.new_server = self.get_readonly_server()
1096
1088
        # The requests to the old server will be redirected
1097
1089
        self.old_server = self.get_secondary_server()
1098
1090
        # Configure the redirections
1099
1091
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
1092
 
 
1093
    def test_loop(self):
 
1094
        # Both servers redirect to each other creating a loop
1100
1095
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1101
 
 
1102
 
    def _qualified_url(self, host, port):
1103
 
        return 'http+%s://%s:%s' % (self._qualifier, host, port)
1104
 
 
1105
 
    def test_loop(self):
1106
1096
        # Starting from either server should loop
1107
 
        old_url = self._qualified_url(self.old_server.host, 
 
1097
        old_url = self._qualified_url(self.old_server.host,
1108
1098
                                      self.old_server.port)
1109
1099
        oldt = self._transport(old_url)
1110
1100
        self.assertRaises(errors.NotBranchError,
1111
1101
                          bzrdir.BzrDir.open_from_transport, oldt)
1112
 
        new_url = self._qualified_url(self.new_server.host, 
 
1102
        new_url = self._qualified_url(self.new_server.host,
1113
1103
                                      self.new_server.port)
1114
1104
        newt = self._transport(new_url)
1115
1105
        self.assertRaises(errors.NotBranchError,
1116
1106
                          bzrdir.BzrDir.open_from_transport, newt)
1117
1107
 
1118
 
 
1119
 
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
1120
 
                                  TestCaseWithTwoWebservers):
 
1108
    def test_qualifier_preserved(self):
 
1109
        wt = self.make_branch_and_tree('branch')
 
1110
        old_url = self._qualified_url(self.old_server.host,
 
1111
                                      self.old_server.port)
 
1112
        start = self._transport(old_url).clone('branch')
 
1113
        bdir = bzrdir.BzrDir.open_from_transport(start)
 
1114
        # Redirection should preserve the qualifier, hence the transport class
 
1115
        # itself.
 
1116
        self.assertIsInstance(bdir.root_transport, type(start))
 
1117
 
 
1118
 
 
1119
class TestHTTPRedirections_urllib(TestHTTPRedirections,
 
1120
                                  http_utils.TestCaseWithTwoWebservers):
1121
1121
    """Tests redirections for urllib implementation"""
1122
1122
 
1123
 
    _qualifier = 'urllib'
1124
1123
    _transport = HttpTransport_urllib
1125
1124
 
 
1125
    def _qualified_url(self, host, port):
 
1126
        result = 'http+urllib://%s:%s' % (host, port)
 
1127
        self.permit_url(result)
 
1128
        return result
 
1129
 
1126
1130
 
1127
1131
 
1128
1132
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1129
 
                                  TestHTTPRedirectionLoop,
1130
 
                                  TestCaseWithTwoWebservers):
 
1133
                                  TestHTTPRedirections,
 
1134
                                  http_utils.TestCaseWithTwoWebservers):
1131
1135
    """Tests redirections for pycurl implementation"""
1132
1136
 
1133
 
    _qualifier = 'pycurl'
 
1137
    def _qualified_url(self, host, port):
 
1138
        result = 'http+pycurl://%s:%s' % (host, port)
 
1139
        self.permit_url(result)
 
1140
        return result
 
1141
 
 
1142
 
 
1143
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
 
1144
                                  http_utils.TestCaseWithTwoWebservers):
 
1145
    """Tests redirections for the nosmart decorator"""
 
1146
 
 
1147
    _transport = NoSmartTransportDecorator
 
1148
 
 
1149
    def _qualified_url(self, host, port):
 
1150
        result = 'nosmart+http://%s:%s' % (host, port)
 
1151
        self.permit_url(result)
 
1152
        return result
 
1153
 
 
1154
 
 
1155
class TestHTTPRedirections_readonly(TestHTTPRedirections,
 
1156
                                    http_utils.TestCaseWithTwoWebservers):
 
1157
    """Tests redirections for readonly decoratror"""
 
1158
 
 
1159
    _transport = ReadonlyTransportDecorator
 
1160
 
 
1161
    def _qualified_url(self, host, port):
 
1162
        result = 'readonly+http://%s:%s' % (host, port)
 
1163
        self.permit_url(result)
 
1164
        return result
1134
1165
 
1135
1166
 
1136
1167
class TestDotBzrHidden(TestCaseWithTransport):
1171
1202
 
1172
1203
class _TestBzrDir(bzrdir.BzrDirMeta1):
1173
1204
    """Test BzrDir implementation for TestBzrDirSprout.
1174
 
    
 
1205
 
1175
1206
    When created a _TestBzrDir already has repository and a branch.  The branch
1176
1207
    is a test double as well.
1177
1208
    """
1178
1209
 
1179
1210
    def __init__(self, *args, **kwargs):
1180
1211
        super(_TestBzrDir, self).__init__(*args, **kwargs)
1181
 
        self.test_branch = _TestBranch()
 
1212
        self.test_branch = _TestBranch(self.transport)
1182
1213
        self.test_branch.repository = self.create_repository()
1183
1214
 
1184
1215
    def open_branch(self, unsupported=False):
1188
1219
        return _TestBzrDirFormat()
1189
1220
 
1190
1221
 
 
1222
class _TestBranchFormat(bzrlib.branch.BranchFormat):
 
1223
    """Test Branch format for TestBzrDirSprout."""
 
1224
 
 
1225
 
1191
1226
class _TestBranch(bzrlib.branch.Branch):
1192
1227
    """Test Branch implementation for TestBzrDirSprout."""
1193
1228
 
1194
 
    def __init__(self, *args, **kwargs):
 
1229
    def __init__(self, transport, *args, **kwargs):
 
1230
        self._format = _TestBranchFormat()
 
1231
        self._transport = transport
 
1232
        self.base = transport.base
1195
1233
        super(_TestBranch, self).__init__(*args, **kwargs)
1196
1234
        self.calls = []
1197
1235
        self._parent = None
1198
1236
 
1199
1237
    def sprout(self, *args, **kwargs):
1200
1238
        self.calls.append('sprout')
1201
 
        return _TestBranch()
 
1239
        return _TestBranch(self._transport)
1202
1240
 
1203
1241
    def copy_content_into(self, destination, revision_id=None):
1204
1242
        self.calls.append('copy_content_into')
1205
1243
 
 
1244
    def last_revision(self):
 
1245
        return _mod_revision.NULL_REVISION
 
1246
 
1206
1247
    def get_parent(self):
1207
1248
        return self._parent
1208
1249
 
 
1250
    def _get_config(self):
 
1251
        return config.TransportConfig(self._transport, 'branch.conf')
 
1252
 
1209
1253
    def set_parent(self, parent):
1210
1254
        self._parent = parent
1211
1255
 
 
1256
    def lock_read(self):
 
1257
        return lock.LogicalLockResult(self.unlock)
 
1258
 
 
1259
    def unlock(self):
 
1260
        return
 
1261
 
1212
1262
 
1213
1263
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1214
1264
 
1218
1268
        Usually, BzrDir.sprout should delegate to the branch's sprout method
1219
1269
        for part of the work.  This allows the source branch to control the
1220
1270
        choice of format for the new branch.
1221
 
        
 
1271
 
1222
1272
        There are exceptions, but this tests avoids them:
1223
1273
          - if there's no branch in the source bzrdir,
1224
1274
          - or if the stacking has been requested and the format needs to be
1245
1295
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1246
1296
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
1247
1297
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
 
1298
 
 
1299
 
 
1300
class TestBzrDirHooks(TestCaseWithMemoryTransport):
 
1301
 
 
1302
    def test_pre_open_called(self):
 
1303
        calls = []
 
1304
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
 
1305
        transport = self.get_transport('foo')
 
1306
        url = transport.base
 
1307
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
 
1308
        self.assertEqual([transport.base], [t.base for t in calls])
 
1309
 
 
1310
    def test_pre_open_actual_exceptions_raised(self):
 
1311
        count = [0]
 
1312
        def fail_once(transport):
 
1313
            count[0] += 1
 
1314
            if count[0] == 1:
 
1315
                raise errors.BzrError("fail")
 
1316
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
 
1317
        transport = self.get_transport('foo')
 
1318
        url = transport.base
 
1319
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
 
1320
        self.assertEqual('fail', err._preformatted_string)
 
1321
 
 
1322
    def test_post_repo_init(self):
 
1323
        from bzrlib.bzrdir import RepoInitHookParams
 
1324
        calls = []
 
1325
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1326
            calls.append, None)
 
1327
        self.make_repository('foo')
 
1328
        self.assertLength(1, calls)
 
1329
        params = calls[0]
 
1330
        self.assertIsInstance(params, RepoInitHookParams)
 
1331
        self.assertTrue(hasattr(params, 'bzrdir'))
 
1332
        self.assertTrue(hasattr(params, 'repository'))
 
1333
 
 
1334
    def test_post_repo_init_hook_repr(self):
 
1335
        param_reprs = []
 
1336
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1337
            lambda params: param_reprs.append(repr(params)), None)
 
1338
        self.make_repository('foo')
 
1339
        self.assertLength(1, param_reprs)
 
1340
        param_repr = param_reprs[0]
 
1341
        self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
 
1342
 
 
1343
 
 
1344
class TestGenerateBackupName(TestCaseWithMemoryTransport):
 
1345
    # FIXME: This may need to be unified with test_osutils.TestBackupNames or
 
1346
    # moved to per_bzrdir or per_transport for better coverage ?
 
1347
    # -- vila 20100909
 
1348
 
 
1349
    def setUp(self):
 
1350
        super(TestGenerateBackupName, self).setUp()
 
1351
        self._transport = self.get_transport()
 
1352
        bzrdir.BzrDir.create(self.get_url(),
 
1353
            possible_transports=[self._transport])
 
1354
        self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
 
1355
 
 
1356
    def test_deprecated_generate_backup_name(self):
 
1357
        res = self.applyDeprecated(
 
1358
                symbol_versioning.deprecated_in((2, 3, 0)),
 
1359
                self._bzrdir.generate_backup_name, 'whatever')
 
1360
 
 
1361
    def test_new(self):
 
1362
        self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
 
1363
 
 
1364
    def test_exiting(self):
 
1365
        self._transport.put_bytes("a.~1~", "some content")
 
1366
        self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))
 
1367