~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_controldir/test_controldir.py

  • Committer: Jelmer Vernooij
  • Date: 2011-08-19 22:34:02 UTC
  • mto: This revision was merged to the branch mainline in revision 6089.
  • Revision ID: jelmer@samba.org-20110819223402-wjywqb0fa1xxx522
Use get_transport_from_{url,path} in more places.

Show diffs side-by-side

added added

removed removed

Lines of Context:
7
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for control directory implementations - tests a controldir format."""
18
 
 
19
 
from itertools import izip
20
 
 
21
 
import bzrlib.branch
22
 
from bzrlib import (
23
 
    bzrdir,
24
 
    check,
25
 
    controldir,
26
 
    errors,
27
 
    gpg,
28
 
    osutils,
29
 
    transport,
30
 
    ui,
31
 
    urlutils,
32
 
    workingtree,
33
 
    )
34
 
import bzrlib.revision
35
 
from bzrlib.tests import (
36
 
    fixtures,
37
 
    ChrootedTestCase,
38
 
    TestNotApplicable,
39
 
    TestSkipped,
40
 
    )
41
 
from bzrlib.tests.per_controldir import TestCaseWithControlDir
42
 
from bzrlib.transport.local import LocalTransport
43
 
from bzrlib.ui import (
44
 
    CannedInputUIFactory,
45
 
    )
46
 
from bzrlib.remote import (
47
 
    RemoteBzrDir,
48
 
    RemoteBzrDirFormat,
49
 
    RemoteRepository,
50
 
    )
51
 
 
52
 
 
53
 
class TestControlDir(TestCaseWithControlDir):
54
 
 
55
 
    def skipIfNoWorkingTree(self, a_bzrdir):
56
 
        """Raises TestSkipped if a_bzrdir doesn't have a working tree.
57
 
 
58
 
        If the bzrdir does have a workingtree, this is a no-op.
59
 
        """
60
 
        try:
61
 
            a_bzrdir.open_workingtree()
62
 
        except (errors.NotLocalUrl, errors.NoWorkingTree):
63
 
            raise TestSkipped("bzrdir on transport %r has no working tree"
64
 
                              % a_bzrdir.transport)
65
 
 
66
 
    def openWorkingTreeIfLocal(self, a_bzrdir):
67
 
        """If a_bzrdir is on a local transport, call open_workingtree() on it.
68
 
        """
69
 
        if not isinstance(a_bzrdir.root_transport, LocalTransport):
70
 
            # it's not local, but that's ok
71
 
            return
72
 
        a_bzrdir.open_workingtree()
73
 
 
74
 
    def createWorkingTreeOrSkip(self, a_bzrdir):
75
 
        """Create a working tree on a_bzrdir, or raise TestSkipped.
76
 
 
77
 
        A simple wrapper for create_workingtree that translates NotLocalUrl into
78
 
        TestSkipped.  Returns the newly created working tree.
79
 
        """
80
 
        try:
81
 
            return a_bzrdir.create_workingtree()
82
 
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
83
 
            raise TestSkipped("cannot make working tree with transport %r"
84
 
                              % a_bzrdir.transport)
85
 
 
86
 
    def sproutOrSkip(self, from_bzrdir, to_url, revision_id=None,
87
 
                     force_new_repo=False, accelerator_tree=None,
88
 
                     create_tree_if_local=True):
89
 
        """Sprout from_bzrdir into to_url, or raise TestSkipped.
90
 
 
91
 
        A simple wrapper for from_bzrdir.sprout that translates NotLocalUrl into
92
 
        TestSkipped.  Returns the newly sprouted bzrdir.
93
 
        """
94
 
        to_transport = transport.get_transport(to_url)
95
 
        if not isinstance(to_transport, LocalTransport):
96
 
            raise TestSkipped('Cannot sprout to remote bzrdirs.')
97
 
        target = from_bzrdir.sprout(to_url, revision_id=revision_id,
98
 
                                    force_new_repo=force_new_repo,
99
 
                                    possible_transports=[to_transport],
100
 
                                    accelerator_tree=accelerator_tree,
101
 
                                    create_tree_if_local=create_tree_if_local)
102
 
        return target
103
 
 
104
 
    def test_create_null_workingtree(self):
105
 
        dir = self.make_bzrdir('dir1')
106
 
        dir.create_repository()
107
 
        dir.create_branch()
108
 
        try:
109
 
            wt = dir.create_workingtree(revision_id=bzrlib.revision.NULL_REVISION)
110
 
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
111
 
            raise TestSkipped("cannot make working tree with transport %r"
112
 
                              % dir.transport)
113
 
        self.assertEqual([], wt.get_parent_ids())
114
 
 
115
 
    def test_destroy_workingtree(self):
116
 
        tree = self.make_branch_and_tree('tree')
117
 
        self.build_tree(['tree/file'])
118
 
        tree.add('file')
119
 
        tree.commit('first commit')
120
 
        bzrdir = tree.bzrdir
121
 
        try:
122
 
            bzrdir.destroy_workingtree()
123
 
        except errors.UnsupportedOperation:
124
 
            raise TestSkipped('Format does not support destroying tree')
125
 
        self.assertPathDoesNotExist('tree/file')
126
 
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
127
 
        bzrdir.create_workingtree()
128
 
        self.assertPathExists('tree/file')
129
 
        bzrdir.destroy_workingtree_metadata()
130
 
        self.assertPathExists('tree/file')
131
 
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
132
 
 
133
 
    def test_destroy_branch(self):
134
 
        branch = self.make_branch('branch')
135
 
        bzrdir = branch.bzrdir
136
 
        try:
137
 
            bzrdir.destroy_branch()
138
 
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
139
 
            raise TestNotApplicable('Format does not support destroying branch')
140
 
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
141
 
        bzrdir.create_branch()
142
 
        bzrdir.open_branch()
143
 
 
144
 
    def test_destroy_repository(self):
145
 
        repo = self.make_repository('repository')
146
 
        bzrdir = repo.bzrdir
147
 
        try:
148
 
            bzrdir.destroy_repository()
149
 
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
150
 
            raise TestNotApplicable('Format does not support destroying'
151
 
                                    ' repository')
152
 
        self.assertRaises(errors.NoRepositoryPresent, bzrdir.open_repository)
153
 
        bzrdir.create_repository()
154
 
        bzrdir.open_repository()
155
 
 
156
 
    def test_open_workingtree_raises_no_working_tree(self):
157
 
        """ControlDir.open_workingtree() should raise NoWorkingTree (rather than
158
 
        e.g. NotLocalUrl) if there is no working tree.
159
 
        """
160
 
        dir = self.make_bzrdir('source')
161
 
        vfs_dir = bzrdir.BzrDir.open(self.get_vfs_only_url('source'))
162
 
        if vfs_dir.has_workingtree():
163
 
            # This ControlDir format doesn't support ControlDirs without
164
 
            # working trees, so this test is irrelevant.
165
 
            return
166
 
        self.assertRaises(errors.NoWorkingTree, dir.open_workingtree)
167
 
 
168
 
    def test_clone_bzrdir_repository_under_shared(self):
169
 
        tree = self.make_branch_and_tree('commit_tree')
170
 
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
171
 
        tree.add('foo')
172
 
        tree.commit('revision 1', rev_id='1')
173
 
        dir = self.make_bzrdir('source')
174
 
        repo = dir.create_repository()
175
 
        repo.fetch(tree.branch.repository)
176
 
        self.assertTrue(repo.has_revision('1'))
177
 
        try:
178
 
            self.make_repository('target', shared=True)
179
 
        except errors.IncompatibleFormat:
180
 
            return
181
 
        target = dir.clone(self.get_url('target/child'))
182
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
183
 
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
184
 
 
185
 
    def test_clone_bzrdir_repository_branch_both_under_shared(self):
186
 
        # Create a shared repository
187
 
        try:
188
 
            shared_repo = self.make_repository('shared', shared=True)
189
 
        except errors.IncompatibleFormat:
190
 
            return
191
 
        # Make a branch, 'commit_tree', and working tree outside of the shared
192
 
        # repository, and commit some revisions to it.
193
 
        tree = self.make_branch_and_tree('commit_tree')
194
 
        self.build_tree(['foo'], transport=tree.bzrdir.root_transport)
195
 
        tree.add('foo')
196
 
        tree.commit('revision 1', rev_id='1')
197
 
        tree.bzrdir.open_branch().generate_revision_history(
198
 
            bzrlib.revision.NULL_REVISION)
199
 
        tree.set_parent_trees([])
200
 
        tree.commit('revision 2', rev_id='2')
201
 
        # Copy the content (i.e. revisions) from the 'commit_tree' branch's
202
 
        # repository into the shared repository.
203
 
        tree.branch.repository.copy_content_into(shared_repo)
204
 
        # Make a branch 'source' inside the shared repository.
205
 
        dir = self.make_bzrdir('shared/source')
206
 
        dir.create_branch()
207
 
        # Clone 'source' to 'target', also inside the shared repository.
208
 
        target = dir.clone(self.get_url('shared/target'))
209
 
        # 'source', 'target', and the shared repo all have distinct bzrdirs.
210
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
211
 
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
212
 
        # The shared repository will contain revisions from the 'commit_tree'
213
 
        # repository, even revisions that are not part of the history of the
214
 
        # 'commit_tree' branch.
215
 
        self.assertTrue(shared_repo.has_revision('1'))
216
 
 
217
 
    def test_clone_bzrdir_repository_branch_only_source_under_shared(self):
218
 
        try:
219
 
            shared_repo = self.make_repository('shared', shared=True)
220
 
        except errors.IncompatibleFormat:
221
 
            return
222
 
        tree = self.make_branch_and_tree('commit_tree')
223
 
        self.build_tree(['commit_tree/foo'])
224
 
        tree.add('foo')
225
 
        tree.commit('revision 1', rev_id='1')
226
 
        tree.branch.bzrdir.open_branch().generate_revision_history(
227
 
            bzrlib.revision.NULL_REVISION)
228
 
        tree.set_parent_trees([])
229
 
        tree.commit('revision 2', rev_id='2')
230
 
        tree.branch.repository.copy_content_into(shared_repo)
231
 
        if shared_repo.make_working_trees():
232
 
            shared_repo.set_make_working_trees(False)
233
 
            self.assertFalse(shared_repo.make_working_trees())
234
 
        self.assertTrue(shared_repo.has_revision('1'))
235
 
        dir = self.make_bzrdir('shared/source')
236
 
        dir.create_branch()
237
 
        target = dir.clone(self.get_url('target'))
238
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
239
 
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
240
 
        branch = target.open_branch()
241
 
        self.assertTrue(branch.repository.has_revision('1'))
242
 
        self.assertFalse(branch.repository.make_working_trees())
243
 
        self.assertTrue(branch.repository.is_shared())
244
 
 
245
 
    def test_clone_bzrdir_repository_revision(self):
246
 
        # test for revision limiting, [smoke test, not corner case checks].
247
 
        # make a repository with some revisions,
248
 
        # and clone it with a revision limit.
249
 
        #
250
 
        tree = self.make_branch_and_tree('commit_tree')
251
 
        self.build_tree(['commit_tree/foo'])
252
 
        tree.add('foo')
253
 
        tree.commit('revision 1', rev_id='1')
254
 
        tree.branch.bzrdir.open_branch().generate_revision_history(
255
 
            bzrlib.revision.NULL_REVISION)
256
 
        tree.set_parent_trees([])
257
 
        tree.commit('revision 2', rev_id='2')
258
 
        source = self.make_repository('source')
259
 
        tree.branch.repository.copy_content_into(source)
260
 
        dir = source.bzrdir
261
 
        target = dir.clone(self.get_url('target'), revision_id='2')
262
 
        raise TestSkipped('revision limiting not strict yet')
263
 
 
264
 
    def test_clone_bzrdir_branch_and_repo_fixed_user_id(self):
265
 
        # Bug #430868 is about an email containing '.sig'
266
 
        self.overrideEnv('BZR_EMAIL', 'murphy@host.sighup.org')
267
 
        tree = self.make_branch_and_tree('commit_tree')
268
 
        self.build_tree(['commit_tree/foo'])
269
 
        tree.add('foo')
270
 
        rev1 = tree.commit('revision 1')
271
 
        tree_repo = tree.branch.repository
272
 
        tree_repo.lock_write()
273
 
        tree_repo.start_write_group()
274
 
        tree_repo.sign_revision(rev1, gpg.LoopbackGPGStrategy(None))
275
 
        tree_repo.commit_write_group()
276
 
        tree_repo.unlock()
277
 
        target = self.make_branch('target')
278
 
        tree.branch.repository.copy_content_into(target.repository)
279
 
        tree.branch.copy_content_into(target)
280
 
        self.assertTrue(target.repository.has_revision(rev1))
281
 
        self.assertEqual(
282
 
            tree_repo.get_signature_text(rev1),
283
 
            target.repository.get_signature_text(rev1))
284
 
 
285
 
    def test_clone_bzrdir_branch_and_repo_into_shared_repo(self):
286
 
        # by default cloning into a shared repo uses the shared repo.
287
 
        tree = self.make_branch_and_tree('commit_tree')
288
 
        self.build_tree(['commit_tree/foo'])
289
 
        tree.add('foo')
290
 
        tree.commit('revision 1')
291
 
        source = self.make_branch('source')
292
 
        tree.branch.repository.copy_content_into(source.repository)
293
 
        tree.branch.copy_content_into(source)
294
 
        try:
295
 
            self.make_repository('target', shared=True)
296
 
        except errors.IncompatibleFormat:
297
 
            return
298
 
        dir = source.bzrdir
299
 
        target = dir.clone(self.get_url('target/child'))
300
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
301
 
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
302
 
        self.assertEqual(source.revision_history(),
303
 
                         target.open_branch().revision_history())
304
 
 
305
 
    def test_clone_bzrdir_branch_revision(self):
306
 
        # test for revision limiting, [smoke test, not corner case checks].
307
 
        # make a branch with some revisions,
308
 
        # and clone it with a revision limit.
309
 
        #
310
 
        tree = self.make_branch_and_tree('commit_tree')
311
 
        self.build_tree(['commit_tree/foo'])
312
 
        tree.add('foo')
313
 
        tree.commit('revision 1', rev_id='1')
314
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
315
 
        source = self.make_branch('source')
316
 
        tree.branch.repository.copy_content_into(source.repository)
317
 
        tree.branch.copy_content_into(source)
318
 
        dir = source.bzrdir
319
 
        target = dir.clone(self.get_url('target'), revision_id='1')
320
 
        self.assertEqual('1', target.open_branch().last_revision())
321
 
 
322
 
    def test_clone_on_transport_preserves_repo_format(self):
323
 
        if self.bzrdir_format == bzrdir.format_registry.make_bzrdir('default'):
324
 
            format = 'knit'
325
 
        else:
326
 
            format = None
327
 
        source_branch = self.make_branch('source', format=format)
328
 
        # Ensure no format data is cached
329
 
        a_dir = bzrlib.branch.Branch.open_from_transport(
330
 
            self.get_transport('source')).bzrdir
331
 
        target_transport = self.get_transport('target')
332
 
        target_bzrdir = a_dir.clone_on_transport(target_transport)
333
 
        target_repo = target_bzrdir.open_repository()
334
 
        source_branch = bzrlib.branch.Branch.open(
335
 
            self.get_vfs_only_url('source'))
336
 
        if isinstance(target_repo, RemoteRepository):
337
 
            target_repo._ensure_real()
338
 
            target_repo = target_repo._real_repository
339
 
        self.assertEqual(target_repo._format, source_branch.repository._format)
340
 
 
341
 
    def test_clone_bzrdir_tree_revision(self):
342
 
        # test for revision limiting, [smoke test, not corner case checks].
343
 
        # make a tree with a revision with a last-revision
344
 
        # and clone it with a revision limit.
345
 
        # This smoke test just checks the revision-id is right. Tree specific
346
 
        # tests will check corner cases.
347
 
        tree = self.make_branch_and_tree('source')
348
 
        self.build_tree(['source/foo'])
349
 
        tree.add('foo')
350
 
        tree.commit('revision 1', rev_id='1')
351
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
352
 
        dir = tree.bzrdir
353
 
        target = dir.clone(self.get_url('target'), revision_id='1')
354
 
        self.skipIfNoWorkingTree(target)
355
 
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
356
 
 
357
 
    def test_clone_bzrdir_into_notrees_repo(self):
358
 
        """Cloning into a no-trees repo should not create a working tree"""
359
 
        tree = self.make_branch_and_tree('source')
360
 
        self.build_tree(['source/foo'])
361
 
        tree.add('foo')
362
 
        tree.commit('revision 1')
363
 
 
364
 
        try:
365
 
            repo = self.make_repository('repo', shared=True)
366
 
        except errors.IncompatibleFormat:
367
 
            raise TestNotApplicable('must support shared repositories')
368
 
        if repo.make_working_trees():
369
 
            repo.set_make_working_trees(False)
370
 
            self.assertFalse(repo.make_working_trees())
371
 
 
372
 
        dir = tree.bzrdir
373
 
        a_dir = dir.clone(self.get_url('repo/a'))
374
 
        a_dir.open_branch()
375
 
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
376
 
 
377
 
    def test_clone_respects_stacked(self):
378
 
        branch = self.make_branch('parent')
379
 
        child_transport = self.get_transport('child')
380
 
        child = branch.bzrdir.clone_on_transport(child_transport,
381
 
                                                 stacked_on=branch.base)
382
 
        self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base)
383
 
 
384
 
    def test_get_branch_reference_on_reference(self):
385
 
        """get_branch_reference should return the right url."""
386
 
        referenced_branch = self.make_branch('referenced')
387
 
        dir = self.make_bzrdir('source')
388
 
        try:
389
 
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
390
 
                target_branch=referenced_branch)
391
 
        except errors.IncompatibleFormat:
392
 
            # this is ok too, not all formats have to support references.
393
 
            return
394
 
        self.assertEqual(referenced_branch.bzrdir.root_transport.abspath('') + '/',
395
 
            dir.get_branch_reference())
396
 
 
397
 
    def test_get_branch_reference_on_non_reference(self):
398
 
        """get_branch_reference should return None for non-reference branches."""
399
 
        branch = self.make_branch('referenced')
400
 
        self.assertEqual(None, branch.bzrdir.get_branch_reference())
401
 
 
402
 
    def test_get_branch_reference_no_branch(self):
403
 
        """get_branch_reference should not mask NotBranchErrors."""
404
 
        dir = self.make_bzrdir('source')
405
 
        if dir.has_branch():
406
 
            # this format does not support branchless bzrdirs.
407
 
            return
408
 
        self.assertRaises(errors.NotBranchError, dir.get_branch_reference)
409
 
 
410
 
    def test_sprout_bzrdir_empty(self):
411
 
        dir = self.make_bzrdir('source')
412
 
        target = dir.sprout(self.get_url('target'))
413
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
414
 
        # creates a new repository branch and tree
415
 
        target.open_repository()
416
 
        target.open_branch()
417
 
        self.openWorkingTreeIfLocal(target)
418
 
 
419
 
    def test_sprout_bzrdir_empty_under_shared_repo(self):
420
 
        # sprouting an empty dir into a repo uses the repo
421
 
        dir = self.make_bzrdir('source')
422
 
        try:
423
 
            self.make_repository('target', shared=True)
424
 
        except errors.IncompatibleFormat:
425
 
            return
426
 
        target = dir.sprout(self.get_url('target/child'))
427
 
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
428
 
        target.open_branch()
429
 
        try:
430
 
            target.open_workingtree()
431
 
        except errors.NoWorkingTree:
432
 
            # Some bzrdirs can never have working trees.
433
 
            self.assertFalse(target._format.supports_workingtrees)
434
 
 
435
 
    def test_sprout_bzrdir_empty_under_shared_repo_force_new(self):
436
 
        # the force_new_repo parameter should force use of a new repo in an empty
437
 
        # bzrdir's sprout logic
438
 
        dir = self.make_bzrdir('source')
439
 
        try:
440
 
            self.make_repository('target', shared=True)
441
 
        except errors.IncompatibleFormat:
442
 
            return
443
 
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
444
 
        target.open_repository()
445
 
        target.open_branch()
446
 
        self.openWorkingTreeIfLocal(target)
447
 
 
448
 
    def test_sprout_bzrdir_with_repository_to_shared(self):
449
 
        tree = self.make_branch_and_tree('commit_tree')
450
 
        self.build_tree(['commit_tree/foo'])
451
 
        tree.add('foo')
452
 
        tree.commit('revision 1', rev_id='1')
453
 
        tree.bzrdir.open_branch().generate_revision_history(
454
 
            bzrlib.revision.NULL_REVISION)
455
 
        tree.set_parent_trees([])
456
 
        tree.commit('revision 2', rev_id='2')
457
 
        source = self.make_repository('source')
458
 
        tree.branch.repository.copy_content_into(source)
459
 
        dir = source.bzrdir
460
 
        try:
461
 
            shared_repo = self.make_repository('target', shared=True)
462
 
        except errors.IncompatibleFormat:
463
 
            return
464
 
        target = dir.sprout(self.get_url('target/child'))
465
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
466
 
        self.assertTrue(shared_repo.has_revision('1'))
467
 
 
468
 
    def test_sprout_bzrdir_repository_branch_both_under_shared(self):
469
 
        try:
470
 
            shared_repo = self.make_repository('shared', shared=True)
471
 
        except errors.IncompatibleFormat:
472
 
            return
473
 
        tree = self.make_branch_and_tree('commit_tree')
474
 
        self.build_tree(['commit_tree/foo'])
475
 
        tree.add('foo')
476
 
        tree.commit('revision 1', rev_id='1')
477
 
        tree.bzrdir.open_branch().generate_revision_history(
478
 
            bzrlib.revision.NULL_REVISION)
479
 
        tree.set_parent_trees([])
480
 
        tree.commit('revision 2', rev_id='2')
481
 
        tree.branch.repository.copy_content_into(shared_repo)
482
 
        dir = self.make_bzrdir('shared/source')
483
 
        dir.create_branch()
484
 
        target = dir.sprout(self.get_url('shared/target'))
485
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
486
 
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
487
 
        self.assertTrue(shared_repo.has_revision('1'))
488
 
 
489
 
    def test_sprout_bzrdir_repository_branch_only_source_under_shared(self):
490
 
        try:
491
 
            shared_repo = self.make_repository('shared', shared=True)
492
 
        except errors.IncompatibleFormat:
493
 
            return
494
 
        tree = self.make_branch_and_tree('commit_tree')
495
 
        self.build_tree(['commit_tree/foo'])
496
 
        tree.add('foo')
497
 
        tree.commit('revision 1', rev_id='1')
498
 
        tree.bzrdir.open_branch().generate_revision_history(
499
 
            bzrlib.revision.NULL_REVISION)
500
 
        tree.set_parent_trees([])
501
 
        tree.commit('revision 2', rev_id='2')
502
 
        tree.branch.repository.copy_content_into(shared_repo)
503
 
        if shared_repo.make_working_trees():
504
 
            shared_repo.set_make_working_trees(False)
505
 
            self.assertFalse(shared_repo.make_working_trees())
506
 
        self.assertTrue(shared_repo.has_revision('1'))
507
 
        dir = self.make_bzrdir('shared/source')
508
 
        dir.create_branch()
509
 
        target = dir.sprout(self.get_url('target'))
510
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
511
 
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
512
 
        branch = target.open_branch()
513
 
        # The sprouted bzrdir has a branch, so only revisions referenced by
514
 
        # that branch are copied, rather than the whole repository.  It's an
515
 
        # empty branch, so none are copied.
516
 
        self.assertEqual([], branch.repository.all_revision_ids())
517
 
        if branch.bzrdir._format.supports_workingtrees:
518
 
            self.assertTrue(branch.repository.make_working_trees())
519
 
        self.assertFalse(branch.repository.is_shared())
520
 
 
521
 
    def test_sprout_bzrdir_repository_under_shared_force_new_repo(self):
522
 
        tree = self.make_branch_and_tree('commit_tree')
523
 
        self.build_tree(['commit_tree/foo'])
524
 
        tree.add('foo')
525
 
        tree.commit('revision 1', rev_id='1')
526
 
        tree.bzrdir.open_branch().generate_revision_history(
527
 
            bzrlib.revision.NULL_REVISION)
528
 
        tree.set_parent_trees([])
529
 
        tree.commit('revision 2', rev_id='2')
530
 
        source = self.make_repository('source')
531
 
        tree.branch.repository.copy_content_into(source)
532
 
        dir = source.bzrdir
533
 
        try:
534
 
            shared_repo = self.make_repository('target', shared=True)
535
 
        except errors.IncompatibleFormat:
536
 
            return
537
 
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
538
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
539
 
        self.assertFalse(shared_repo.has_revision('1'))
540
 
 
541
 
    def test_sprout_bzrdir_repository_revision(self):
542
 
        # test for revision limiting, [smoke test, not corner case checks].
543
 
        # make a repository with some revisions,
544
 
        # and sprout it with a revision limit.
545
 
        #
546
 
        tree = self.make_branch_and_tree('commit_tree')
547
 
        self.build_tree(['commit_tree/foo'])
548
 
        tree.add('foo')
549
 
        tree.commit('revision 1', rev_id='1')
550
 
        br = tree.bzrdir.open_branch()
551
 
        br.set_last_revision_info(0, bzrlib.revision.NULL_REVISION)
552
 
        tree.set_parent_trees([])
553
 
        tree.commit('revision 2', rev_id='2')
554
 
        source = self.make_repository('source')
555
 
        tree.branch.repository.copy_content_into(source)
556
 
        dir = source.bzrdir
557
 
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='2')
558
 
        raise TestSkipped('revision limiting not strict yet')
559
 
 
560
 
    def test_sprout_bzrdir_branch_and_repo_shared(self):
561
 
        # sprouting a branch with a repo into a shared repo uses the shared
562
 
        # repo
563
 
        tree = self.make_branch_and_tree('commit_tree')
564
 
        self.build_tree(['commit_tree/foo'])
565
 
        tree.add('foo')
566
 
        tree.commit('revision 1', rev_id='1')
567
 
        source = self.make_branch('source')
568
 
        tree.branch.repository.copy_content_into(source.repository)
569
 
        tree.bzrdir.open_branch().copy_content_into(source)
570
 
        dir = source.bzrdir
571
 
        try:
572
 
            shared_repo = self.make_repository('target', shared=True)
573
 
        except errors.IncompatibleFormat:
574
 
            return
575
 
        target = dir.sprout(self.get_url('target/child'))
576
 
        self.assertTrue(shared_repo.has_revision('1'))
577
 
 
578
 
    def test_sprout_bzrdir_branch_and_repo_shared_force_new_repo(self):
579
 
        # sprouting a branch with a repo into a shared repo uses the shared
580
 
        # repo
581
 
        tree = self.make_branch_and_tree('commit_tree')
582
 
        self.build_tree(['commit_tree/foo'])
583
 
        tree.add('foo')
584
 
        tree.commit('revision 1', rev_id='1')
585
 
        source = self.make_branch('source')
586
 
        tree.branch.repository.copy_content_into(source.repository)
587
 
        tree.bzrdir.open_branch().copy_content_into(source)
588
 
        dir = source.bzrdir
589
 
        try:
590
 
            shared_repo = self.make_repository('target', shared=True)
591
 
        except errors.IncompatibleFormat:
592
 
            return
593
 
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
594
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
595
 
        self.assertFalse(shared_repo.has_revision('1'))
596
 
 
597
 
    def test_sprout_bzrdir_branch_reference(self):
598
 
        # sprouting should create a repository if needed and a sprouted branch.
599
 
        referenced_branch = self.make_branch('referenced')
600
 
        dir = self.make_bzrdir('source')
601
 
        try:
602
 
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
603
 
                target_branch=referenced_branch)
604
 
        except errors.IncompatibleFormat:
605
 
            # this is ok too, not all formats have to support references.
606
 
            return
607
 
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
608
 
        target = dir.sprout(self.get_url('target'))
609
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
610
 
        # we want target to have a branch that is in-place.
611
 
        self.assertEqual(target, target.open_branch().bzrdir)
612
 
        # and as we dont support repositories being detached yet, a repo in
613
 
        # place
614
 
        target.open_repository()
615
 
 
616
 
    def test_sprout_bzrdir_branch_reference_shared(self):
617
 
        # sprouting should create a repository if needed and a sprouted branch.
618
 
        referenced_tree = self.make_branch_and_tree('referenced')
619
 
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
620
 
        dir = self.make_bzrdir('source')
621
 
        try:
622
 
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
623
 
                target_branch=referenced_tree.branch)
624
 
        except errors.IncompatibleFormat:
625
 
            # this is ok too, not all formats have to support references.
626
 
            return
627
 
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
628
 
        try:
629
 
            shared_repo = self.make_repository('target', shared=True)
630
 
        except errors.IncompatibleFormat:
631
 
            return
632
 
        target = dir.sprout(self.get_url('target/child'))
633
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
634
 
        # we want target to have a branch that is in-place.
635
 
        self.assertEqual(target, target.open_branch().bzrdir)
636
 
        # and we want no repository as the target is shared
637
 
        self.assertRaises(errors.NoRepositoryPresent,
638
 
                          target.open_repository)
639
 
        # and we want revision '1' in the shared repo
640
 
        self.assertTrue(shared_repo.has_revision('1'))
641
 
 
642
 
    def test_sprout_bzrdir_branch_reference_shared_force_new_repo(self):
643
 
        # sprouting should create a repository if needed and a sprouted branch.
644
 
        referenced_tree = self.make_branch_and_tree('referenced')
645
 
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
646
 
        dir = self.make_bzrdir('source')
647
 
        try:
648
 
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
649
 
                target_branch=referenced_tree.branch)
650
 
        except errors.IncompatibleFormat:
651
 
            # this is ok too, not all formats have to support references.
652
 
            return
653
 
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
654
 
        try:
655
 
            shared_repo = self.make_repository('target', shared=True)
656
 
        except errors.IncompatibleFormat:
657
 
            return
658
 
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
659
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
660
 
        # we want target to have a branch that is in-place.
661
 
        self.assertEqual(target, target.open_branch().bzrdir)
662
 
        # and we want revision '1' in the new repo
663
 
        self.assertTrue(target.open_repository().has_revision('1'))
664
 
        # but not the shared one
665
 
        self.assertFalse(shared_repo.has_revision('1'))
666
 
 
667
 
    def test_sprout_bzrdir_branch_revision(self):
668
 
        # test for revision limiting, [smoke test, not corner case checks].
669
 
        # make a repository with some revisions,
670
 
        # and sprout it with a revision limit.
671
 
        #
672
 
        tree = self.make_branch_and_tree('commit_tree')
673
 
        self.build_tree(['commit_tree/foo'])
674
 
        tree.add('foo')
675
 
        tree.commit('revision 1', rev_id='1')
676
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
677
 
        source = self.make_branch('source')
678
 
        tree.branch.repository.copy_content_into(source.repository)
679
 
        tree.bzrdir.open_branch().copy_content_into(source)
680
 
        dir = source.bzrdir
681
 
        target = dir.sprout(self.get_url('target'), revision_id='1')
682
 
        self.assertEqual('1', target.open_branch().last_revision())
683
 
 
684
 
    def test_sprout_bzrdir_branch_with_tags(self):
685
 
        # when sprouting a branch all revisions named in the tags are copied
686
 
        # too.
687
 
        builder = self.make_branch_builder('source')
688
 
        source = fixtures.build_branch_with_non_ancestral_rev(builder)
689
 
        try:
690
 
            source.tags.set_tag('tag-a', 'rev-2')
691
 
        except errors.TagsNotSupported:
692
 
            raise TestNotApplicable('Branch format does not support tags.')
693
 
        source.get_config().set_user_option('branch.fetch_tags', 'True')
694
 
        # Now source has a tag not in its ancestry.  Sprout its controldir.
695
 
        dir = source.bzrdir
696
 
        target = dir.sprout(self.get_url('target'))
697
 
        # The tag is present, and so is its revision.
698
 
        new_branch = target.open_branch()
699
 
        self.assertEqual('rev-2', new_branch.tags.lookup_tag('tag-a'))
700
 
        new_branch.repository.get_revision('rev-2')
701
 
 
702
 
    def test_sprout_bzrdir_branch_with_absent_tag(self):
703
 
        # tags referencing absent revisions are copied (and those absent
704
 
        # revisions do not prevent the sprout.)
705
 
        builder = self.make_branch_builder('source')
706
 
        builder.build_commit(message="Rev 1", rev_id='rev-1')
707
 
        source = builder.get_branch()
708
 
        try:
709
 
            source.tags.set_tag('tag-a', 'missing-rev')
710
 
        except errors.TagsNotSupported:
711
 
            raise TestNotApplicable('Branch format does not support tags.')
712
 
        # Now source has a tag pointing to an absent revision.  Sprout its
713
 
        # controldir.
714
 
        dir = source.bzrdir
715
 
        target = dir.sprout(self.get_url('target'))
716
 
        # The tag is present in the target
717
 
        new_branch = target.open_branch()
718
 
        self.assertEqual('missing-rev', new_branch.tags.lookup_tag('tag-a'))
719
 
 
720
 
    def test_sprout_bzrdir_passing_source_branch_with_absent_tag(self):
721
 
        # tags referencing absent revisions are copied (and those absent
722
 
        # revisions do not prevent the sprout.)
723
 
        builder = self.make_branch_builder('source')
724
 
        builder.build_commit(message="Rev 1", rev_id='rev-1')
725
 
        source = builder.get_branch()
726
 
        try:
727
 
            source.tags.set_tag('tag-a', 'missing-rev')
728
 
        except errors.TagsNotSupported:
729
 
            raise TestNotApplicable('Branch format does not support tags.')
730
 
        # Now source has a tag pointing to an absent revision.  Sprout its
731
 
        # controldir.
732
 
        dir = source.bzrdir
733
 
        target = dir.sprout(self.get_url('target'), source_branch=source)
734
 
        # The tag is present in the target
735
 
        new_branch = target.open_branch()
736
 
        self.assertEqual('missing-rev', new_branch.tags.lookup_tag('tag-a'))
737
 
 
738
 
    def test_sprout_bzrdir_passing_rev_not_source_branch_copies_tags(self):
739
 
        # dir.sprout(..., revision_id='rev1') copies rev1, and all the tags of
740
 
        # the branch at that bzrdir, the ancestry of all of those, but no other
741
 
        # revs (not even the tip of the source branch).
742
 
        builder = self.make_branch_builder('source')
743
 
        builder.build_commit(message="Base", rev_id='base-rev')
744
 
        # Make three parallel lines of ancestry off this base.
745
 
        source = builder.get_branch()
746
 
        builder.build_commit(message="Rev A1", rev_id='rev-a1')
747
 
        builder.build_commit(message="Rev A2", rev_id='rev-a2')
748
 
        builder.build_commit(message="Rev A3", rev_id='rev-a3')
749
 
        source.set_last_revision_info(1, 'base-rev')
750
 
        builder.build_commit(message="Rev B1", rev_id='rev-b1')
751
 
        builder.build_commit(message="Rev B2", rev_id='rev-b2')
752
 
        builder.build_commit(message="Rev B3", rev_id='rev-b3')
753
 
        source.set_last_revision_info(1, 'base-rev')
754
 
        builder.build_commit(message="Rev C1", rev_id='rev-c1')
755
 
        builder.build_commit(message="Rev C2", rev_id='rev-c2')
756
 
        builder.build_commit(message="Rev C3", rev_id='rev-c3')
757
 
        # Set the branch tip to A2
758
 
        source.set_last_revision_info(3, 'rev-a2')
759
 
        try:
760
 
            # Create a tag for B2, and for an absent rev
761
 
            source.tags.set_tag('tag-non-ancestry', 'rev-b2')
762
 
            source.tags.set_tag('tag-absent', 'absent-rev')
763
 
        except errors.TagsNotSupported:
764
 
            raise TestNotApplicable('Branch format does not support tags.')
765
 
        source.get_config().set_user_option('branch.fetch_tags', 'True')
766
 
        # And ask sprout for C2
767
 
        dir = source.bzrdir
768
 
        target = dir.sprout(self.get_url('target'), revision_id='rev-c2')
769
 
        # The tags are present
770
 
        new_branch = target.open_branch()
771
 
        self.assertEqual(
772
 
            {'tag-absent': 'absent-rev', 'tag-non-ancestry': 'rev-b2'},
773
 
            new_branch.tags.get_tag_dict())
774
 
        # And the revs for A2, B2 and C2's ancestries are present, but no
775
 
        # others.
776
 
        self.assertEqual(
777
 
            ['base-rev', 'rev-b1', 'rev-b2', 'rev-c1', 'rev-c2'],
778
 
            sorted(new_branch.repository.all_revision_ids()))
779
 
 
780
 
    def test_sprout_bzrdir_tree_branch_reference(self):
781
 
        # sprouting should create a repository if needed and a sprouted branch.
782
 
        # the tree state should not be copied.
783
 
        referenced_branch = self.make_branch('referencced')
784
 
        dir = self.make_bzrdir('source')
785
 
        try:
786
 
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
787
 
                target_branch=referenced_branch)
788
 
        except errors.IncompatibleFormat:
789
 
            # this is ok too, not all formats have to support references.
790
 
            return
791
 
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
792
 
        tree = self.createWorkingTreeOrSkip(dir)
793
 
        self.build_tree(['source/subdir/'])
794
 
        tree.add('subdir')
795
 
        target = dir.sprout(self.get_url('target'))
796
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
797
 
        # we want target to have a branch that is in-place.
798
 
        self.assertEqual(target, target.open_branch().bzrdir)
799
 
        # and as we dont support repositories being detached yet, a repo in
800
 
        # place
801
 
        target.open_repository()
802
 
        result_tree = target.open_workingtree()
803
 
        self.assertFalse(result_tree.has_filename('subdir'))
804
 
 
805
 
    def test_sprout_bzrdir_tree_branch_reference_revision(self):
806
 
        # sprouting should create a repository if needed and a sprouted branch.
807
 
        # the tree state should not be copied but the revision changed,
808
 
        # and the likewise the new branch should be truncated too
809
 
        referenced_branch = self.make_branch('referencced')
810
 
        dir = self.make_bzrdir('source')
811
 
        try:
812
 
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
813
 
                target_branch=referenced_branch)
814
 
        except errors.IncompatibleFormat:
815
 
            # this is ok too, not all formats have to support references.
816
 
            return
817
 
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
818
 
        tree = self.createWorkingTreeOrSkip(dir)
819
 
        self.build_tree(['source/foo'])
820
 
        tree.add('foo')
821
 
        tree.commit('revision 1', rev_id='1')
822
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
823
 
        target = dir.sprout(self.get_url('target'), revision_id='1')
824
 
        self.skipIfNoWorkingTree(target)
825
 
        self.assertNotEqual(dir.transport.base, target.transport.base)
826
 
        # we want target to have a branch that is in-place.
827
 
        self.assertEqual(target, target.open_branch().bzrdir)
828
 
        # and as we dont support repositories being detached yet, a repo in
829
 
        # place
830
 
        target.open_repository()
831
 
        # we trust that the working tree sprouting works via the other tests.
832
 
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
833
 
        self.assertEqual('1', target.open_branch().last_revision())
834
 
 
835
 
    def test_sprout_bzrdir_tree_revision(self):
836
 
        # test for revision limiting, [smoke test, not corner case checks].
837
 
        # make a tree with a revision with a last-revision
838
 
        # and sprout it with a revision limit.
839
 
        # This smoke test just checks the revision-id is right. Tree specific
840
 
        # tests will check corner cases.
841
 
        tree = self.make_branch_and_tree('source')
842
 
        self.build_tree(['source/foo'])
843
 
        tree.add('foo')
844
 
        tree.commit('revision 1', rev_id='1')
845
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
846
 
        dir = tree.bzrdir
847
 
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='1')
848
 
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
849
 
 
850
 
    def test_sprout_takes_accelerator(self):
851
 
        tree = self.make_branch_and_tree('source')
852
 
        self.build_tree(['source/foo'])
853
 
        tree.add('foo')
854
 
        tree.commit('revision 1', rev_id='1')
855
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
856
 
        dir = tree.bzrdir
857
 
        target = self.sproutOrSkip(dir, self.get_url('target'),
858
 
                                   accelerator_tree=tree)
859
 
        self.assertEqual(['2'], target.open_workingtree().get_parent_ids())
860
 
 
861
 
    def test_sprout_branch_no_tree(self):
862
 
        tree = self.make_branch_and_tree('source')
863
 
        self.build_tree(['source/foo'])
864
 
        tree.add('foo')
865
 
        tree.commit('revision 1', rev_id='1')
866
 
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
867
 
        dir = tree.bzrdir
868
 
        try:
869
 
            target = dir.sprout(self.get_url('target'),
870
 
                create_tree_if_local=False)
871
 
        except errors.MustHaveWorkingTree:
872
 
            raise TestNotApplicable("control dir format requires working tree")
873
 
        self.assertPathDoesNotExist('target/foo')
874
 
        self.assertEqual(tree.branch.last_revision(),
875
 
                         target.open_branch().last_revision())
876
 
 
877
 
    def test_sprout_with_revision_id_uses_default_stack_on(self):
878
 
        # Make a branch with three commits to stack on.
879
 
        builder = self.make_branch_builder('stack-on')
880
 
        builder.start_series()
881
 
        builder.build_commit(message='Rev 1.', rev_id='rev-1')
882
 
        builder.build_commit(message='Rev 2.', rev_id='rev-2')
883
 
        builder.build_commit(message='Rev 3.', rev_id='rev-3')
884
 
        builder.finish_series()
885
 
        stack_on = builder.get_branch()
886
 
        # Make a bzrdir with a default stacking policy to stack on that branch.
887
 
        config = self.make_bzrdir('policy-dir').get_config()
888
 
        try:
889
 
            config.set_default_stack_on(self.get_url('stack-on'))
890
 
        except errors.BzrError:
891
 
            raise TestNotApplicable('Only relevant for stackable formats.')
892
 
        # Sprout the stacked-on branch into the bzrdir.
893
 
        sprouted = stack_on.bzrdir.sprout(
894
 
            self.get_url('policy-dir/sprouted'), revision_id='rev-3')
895
 
        # Not all revisions are copied into the sprouted repository.
896
 
        repo = sprouted.open_repository()
897
 
        self.addCleanup(repo.lock_read().unlock)
898
 
        self.assertEqual(None, repo.get_parent_map(['rev-1']).get('rev-1'))
899
 
 
900
 
    def test_format_initialize_find_open(self):
901
 
        # loopback test to check the current format initializes to itself.
902
 
        if not self.bzrdir_format.is_supported():
903
 
            # unsupported formats are not loopback testable
904
 
            # because the default open will not open them and
905
 
            # they may not be initializable.
906
 
            return
907
 
        # for remote formats, there must be no prior assumption about the
908
 
        # network name to use - it's possible that this may somehow have got
909
 
        # in through an unisolated test though - see
910
 
        # <https://bugs.launchpad.net/bzr/+bug/504102>
911
 
        self.assertEquals(getattr(self.bzrdir_format,
912
 
            '_network_name', None),
913
 
            None)
914
 
        # supported formats must be able to init and open
915
 
        t = self.get_transport()
916
 
        readonly_t = self.get_readonly_transport()
917
 
        made_control = self.bzrdir_format.initialize(t.base)
918
 
        self.assertIsInstance(made_control, controldir.ControlDir)
919
 
        if isinstance(self.bzrdir_format, RemoteBzrDirFormat):
920
 
            return
921
 
        self.assertEqual(self.bzrdir_format,
922
 
                         controldir.ControlDirFormat.find_format(readonly_t))
923
 
        direct_opened_dir = self.bzrdir_format.open(readonly_t)
924
 
        opened_dir = bzrdir.BzrDir.open(t.base)
925
 
        self.assertEqual(made_control._format,
926
 
                         opened_dir._format)
927
 
        self.assertEqual(direct_opened_dir._format,
928
 
                         opened_dir._format)
929
 
        self.assertIsInstance(opened_dir, controldir.ControlDir)
930
 
 
931
 
    def test_format_initialize_on_transport_ex(self):
932
 
        t = self.get_transport('dir')
933
 
        self.assertInitializeEx(t)
934
 
 
935
 
    def test_format_initialize_on_transport_ex_use_existing_dir_True(self):
936
 
        t = self.get_transport('dir')
937
 
        t.ensure_base()
938
 
        self.assertInitializeEx(t, use_existing_dir=True)
939
 
 
940
 
    def test_format_initialize_on_transport_ex_use_existing_dir_False(self):
941
 
        if not self.bzrdir_format.is_supported():
942
 
            # Not initializable - not a failure either.
943
 
            return
944
 
        t = self.get_transport('dir')
945
 
        t.ensure_base()
946
 
        self.assertRaises(errors.FileExists,
947
 
            self.bzrdir_format.initialize_on_transport_ex, t,
948
 
            use_existing_dir=False)
949
 
 
950
 
    def test_format_initialize_on_transport_ex_create_prefix_True(self):
951
 
        t = self.get_transport('missing/dir')
952
 
        self.assertInitializeEx(t, create_prefix=True)
953
 
 
954
 
    def test_format_initialize_on_transport_ex_create_prefix_False(self):
955
 
        if not self.bzrdir_format.is_supported():
956
 
            # Not initializable - not a failure either.
957
 
            return
958
 
        t = self.get_transport('missing/dir')
959
 
        self.assertRaises(errors.NoSuchFile, self.assertInitializeEx, t,
960
 
            create_prefix=False)
961
 
 
962
 
    def test_format_initialize_on_transport_ex_force_new_repo_True(self):
963
 
        t = self.get_transport('repo')
964
 
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
965
 
        repo_name = repo_fmt.repository_format.network_name()
966
 
        repo = repo_fmt.initialize_on_transport_ex(t,
967
 
            repo_format_name=repo_name, shared_repo=True)[0]
968
 
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
969
 
            force_new_repo=True, repo_format_name=repo_name)
970
 
        if control is None:
971
 
            # uninitialisable format
972
 
            return
973
 
        self.assertNotEqual(repo.bzrdir.root_transport.base,
974
 
            made_repo.bzrdir.root_transport.base)
975
 
 
976
 
    def test_format_initialize_on_transport_ex_force_new_repo_False(self):
977
 
        t = self.get_transport('repo')
978
 
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
979
 
        repo_name = repo_fmt.repository_format.network_name()
980
 
        repo = repo_fmt.initialize_on_transport_ex(t,
981
 
            repo_format_name=repo_name, shared_repo=True)[0]
982
 
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
983
 
            force_new_repo=False, repo_format_name=repo_name)
984
 
        if control is None:
985
 
            # uninitialisable format
986
 
            return
987
 
        if not control._format.fixed_components:
988
 
            self.assertEqual(repo.bzrdir.root_transport.base,
989
 
                made_repo.bzrdir.root_transport.base)
990
 
 
991
 
    def test_format_initialize_on_transport_ex_stacked_on(self):
992
 
        # trunk is a stackable format.  Note that its in the same server area
993
 
        # which is what launchpad does, but not sufficient to exercise the
994
 
        # general case.
995
 
        trunk = self.make_branch('trunk', format='1.9')
996
 
        t = self.get_transport('stacked')
997
 
        old_fmt = bzrdir.format_registry.make_bzrdir('pack-0.92')
998
 
        repo_name = old_fmt.repository_format.network_name()
999
 
        # Should end up with a 1.9 format (stackable)
1000
 
        repo, control = self.assertInitializeEx(t, need_meta=True,
1001
 
            repo_format_name=repo_name, stacked_on='../trunk',
1002
 
            stack_on_pwd=t.base)
1003
 
        if control is None:
1004
 
            # uninitialisable format
1005
 
            return
1006
 
        self.assertLength(1, repo._fallback_repositories)
1007
 
 
1008
 
    def test_format_initialize_on_transport_ex_default_stack_on(self):
1009
 
        # When initialize_on_transport_ex uses a stacked-on branch because of
1010
 
        # a stacking policy on the target, the location of the fallback
1011
 
        # repository is the same as the external location of the stacked-on
1012
 
        # branch.
1013
 
        balloon = self.make_bzrdir('balloon')
1014
 
        if isinstance(balloon._format, bzrdir.BzrDirMetaFormat1):
1015
 
            stack_on = self.make_branch('stack-on', format='1.9')
1016
 
        else:
1017
 
            stack_on = self.make_branch('stack-on')
1018
 
        config = self.make_bzrdir('.').get_config()
1019
 
        try:
1020
 
            config.set_default_stack_on('stack-on')
1021
 
        except errors.BzrError:
1022
 
            raise TestNotApplicable('Only relevant for stackable formats.')
1023
 
        # Initialize a bzrdir subject to the policy.
1024
 
        t = self.get_transport('stacked')
1025
 
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
1026
 
        repo_name = repo_fmt.repository_format.network_name()
1027
 
        repo, control = self.assertInitializeEx(
1028
 
            t, need_meta=True, repo_format_name=repo_name, stacked_on=None)
1029
 
        # self.addCleanup(repo.unlock)
1030
 
        if control is None:
1031
 
            # uninitialisable format
1032
 
            return
1033
 
        # There's one fallback repo, with a public location.
1034
 
        self.assertLength(1, repo._fallback_repositories)
1035
 
        fallback_repo = repo._fallback_repositories[0]
1036
 
        self.assertEqual(
1037
 
            stack_on.base, fallback_repo.bzrdir.root_transport.base)
1038
 
        # The bzrdir creates a branch in stacking-capable format.
1039
 
        new_branch = control.create_branch()
1040
 
        self.assertTrue(new_branch._format.supports_stacking())
1041
 
 
1042
 
    def test_format_initialize_on_transport_ex_repo_fmt_name_None(self):
1043
 
        t = self.get_transport('dir')
1044
 
        repo, control = self.assertInitializeEx(t)
1045
 
        self.assertEqual(None, repo)
1046
 
 
1047
 
    def test_format_initialize_on_transport_ex_repo_fmt_name_followed(self):
1048
 
        t = self.get_transport('dir')
1049
 
        # 1.6 is likely to never be default
1050
 
        fmt = bzrdir.format_registry.make_bzrdir('1.6')
1051
 
        repo_name = fmt.repository_format.network_name()
1052
 
        repo, control = self.assertInitializeEx(t, repo_format_name=repo_name)
1053
 
        if control is None:
1054
 
            # uninitialisable format
1055
 
            return
1056
 
        if self.bzrdir_format.fixed_components:
1057
 
            # must stay with the all-in-one-format.
1058
 
            repo_name = self.bzrdir_format.network_name()
1059
 
        self.assertEqual(repo_name, repo._format.network_name())
1060
 
 
1061
 
    def assertInitializeEx(self, t, need_meta=False, **kwargs):
1062
 
        """Execute initialize_on_transport_ex and check it succeeded correctly.
1063
 
 
1064
 
        This involves checking that the disk objects were created, open with
1065
 
        the same format returned, and had the expected disk format.
1066
 
 
1067
 
        :param t: The transport to initialize on.
1068
 
        :param **kwargs: Additional arguments to pass to
1069
 
            initialize_on_transport_ex.
1070
 
        :return: the resulting repo, control dir tuple.
1071
 
        """
1072
 
        if not self.bzrdir_format.is_supported():
1073
 
            # Not initializable - not a failure either.
1074
 
            return None, None
1075
 
        repo, control, require_stacking, repo_policy = \
1076
 
            self.bzrdir_format.initialize_on_transport_ex(t, **kwargs)
1077
 
        if repo is not None:
1078
 
            # Repositories are open write-locked
1079
 
            self.assertTrue(repo.is_write_locked())
1080
 
            self.addCleanup(repo.unlock)
1081
 
        self.assertIsInstance(control, controldir.ControlDir)
1082
 
        opened = bzrdir.BzrDir.open(t.base)
1083
 
        expected_format = self.bzrdir_format
1084
 
        if need_meta and expected_format.fixed_components:
1085
 
            # Pre-metadir formats change when we are making something that
1086
 
            # needs a metaformat, because clone is used for push.
1087
 
            expected_format = bzrdir.BzrDirMetaFormat1()
1088
 
        if not isinstance(expected_format, RemoteBzrDirFormat):
1089
 
            self.assertEqual(control._format.network_name(),
1090
 
                expected_format.network_name())
1091
 
            self.assertEqual(control._format.network_name(),
1092
 
                opened._format.network_name())
1093
 
        self.assertEqual(control.__class__, opened.__class__)
1094
 
        return repo, control
1095
 
 
1096
 
    def test_format_network_name(self):
1097
 
        # All control formats must have a network name.
1098
 
        dir = self.make_bzrdir('.')
1099
 
        format = dir._format
1100
 
        # We want to test that the network_name matches the actual format on
1101
 
        # disk. For local control dirsthat means that using network_name as a
1102
 
        # key in the registry gives back the same format. For remote obects
1103
 
        # we check that the network_name of the RemoteBzrDirFormat we have
1104
 
        # locally matches the actual format present on disk.
1105
 
        if isinstance(format, RemoteBzrDirFormat):
1106
 
            dir._ensure_real()
1107
 
            real_dir = dir._real_bzrdir
1108
 
            network_name = format.network_name()
1109
 
            self.assertEqual(real_dir._format.network_name(), network_name)
1110
 
        else:
1111
 
            registry = controldir.network_format_registry
1112
 
            network_name = format.network_name()
1113
 
            looked_up_format = registry.get(network_name)
1114
 
            self.assertTrue(
1115
 
                issubclass(format.__class__, looked_up_format.__class__))
1116
 
        # The network name must be a byte string.
1117
 
        self.assertIsInstance(network_name, str)
1118
 
 
1119
 
    def test_open_not_bzrdir(self):
1120
 
        # test the formats specific behaviour for no-content or similar dirs.
1121
 
        self.assertRaises(errors.NotBranchError,
1122
 
                          self.bzrdir_format.open,
1123
 
                          transport.get_transport(self.get_readonly_url()))
1124
 
 
1125
 
    def test_create_branch(self):
1126
 
        # a bzrdir can construct a branch and repository for itself.
1127
 
        if not self.bzrdir_format.is_supported():
1128
 
            # unsupported formats are not loopback testable
1129
 
            # because the default open will not open them and
1130
 
            # they may not be initializable.
1131
 
            return
1132
 
        t = self.get_transport()
1133
 
        made_control = self.bzrdir_format.initialize(t.base)
1134
 
        made_repo = made_control.create_repository()
1135
 
        made_branch = made_control.create_branch()
1136
 
        self.assertIsInstance(made_branch, bzrlib.branch.Branch)
1137
 
        self.assertEqual(made_control, made_branch.bzrdir)
1138
 
 
1139
 
    def test_open_branch(self):
1140
 
        if not self.bzrdir_format.is_supported():
1141
 
            # unsupported formats are not loopback testable
1142
 
            # because the default open will not open them and
1143
 
            # they may not be initializable.
1144
 
            return
1145
 
        t = self.get_transport()
1146
 
        made_control = self.bzrdir_format.initialize(t.base)
1147
 
        made_repo = made_control.create_repository()
1148
 
        made_branch = made_control.create_branch()
1149
 
        opened_branch = made_control.open_branch()
1150
 
        self.assertEqual(made_control, opened_branch.bzrdir)
1151
 
        self.assertIsInstance(opened_branch, made_branch.__class__)
1152
 
        self.assertIsInstance(opened_branch._format, made_branch._format.__class__)
1153
 
 
1154
 
    def test_list_branches(self):
1155
 
        if not self.bzrdir_format.is_supported():
1156
 
            # unsupported formats are not loopback testable
1157
 
            # because the default open will not open them and
1158
 
            # they may not be initializable.
1159
 
            return
1160
 
        t = self.get_transport()
1161
 
        made_control = self.bzrdir_format.initialize(t.base)
1162
 
        made_repo = made_control.create_repository()
1163
 
        made_branch = made_control.create_branch()
1164
 
        branches = made_control.list_branches()
1165
 
        self.assertEquals(1, len(branches))
1166
 
        self.assertEquals(made_branch.base, branches[0].base)
1167
 
        try:
1168
 
            made_control.destroy_branch()
1169
 
        except errors.UnsupportedOperation:
1170
 
            pass # Not all bzrdirs support destroying directories
1171
 
        else:
1172
 
            self.assertEquals([], made_control.list_branches())
1173
 
 
1174
 
    def test_create_repository(self):
1175
 
        # a bzrdir can construct a repository for itself.
1176
 
        if not self.bzrdir_format.is_supported():
1177
 
            # unsupported formats are not loopback testable
1178
 
            # because the default open will not open them and
1179
 
            # they may not be initializable.
1180
 
            return
1181
 
        t = self.get_transport()
1182
 
        made_control = self.bzrdir_format.initialize(t.base)
1183
 
        made_repo = made_control.create_repository()
1184
 
        # Check that we have a repository object.
1185
 
        made_repo.has_revision('foo')
1186
 
        self.assertEqual(made_control, made_repo.bzrdir)
1187
 
 
1188
 
    def test_create_repository_shared(self):
1189
 
        # a bzrdir can create a shared repository or
1190
 
        # fail appropriately
1191
 
        if not self.bzrdir_format.is_supported():
1192
 
            # unsupported formats are not loopback testable
1193
 
            # because the default open will not open them and
1194
 
            # they may not be initializable.
1195
 
            return
1196
 
        t = self.get_transport()
1197
 
        made_control = self.bzrdir_format.initialize(t.base)
1198
 
        try:
1199
 
            made_repo = made_control.create_repository(shared=True)
1200
 
        except errors.IncompatibleFormat:
1201
 
            # Old bzrdir formats don't support shared repositories
1202
 
            # and should raise IncompatibleFormat
1203
 
            return
1204
 
        self.assertTrue(made_repo.is_shared())
1205
 
 
1206
 
    def test_create_repository_nonshared(self):
1207
 
        # a bzrdir can create a non-shared repository
1208
 
        if not self.bzrdir_format.is_supported():
1209
 
            # unsupported formats are not loopback testable
1210
 
            # because the default open will not open them and
1211
 
            # they may not be initializable.
1212
 
            return
1213
 
        t = self.get_transport()
1214
 
        made_control = self.bzrdir_format.initialize(t.base)
1215
 
        made_repo = made_control.create_repository(shared=False)
1216
 
        self.assertFalse(made_repo.is_shared())
1217
 
 
1218
 
    def test_open_repository(self):
1219
 
        if not self.bzrdir_format.is_supported():
1220
 
            # unsupported formats are not loopback testable
1221
 
            # because the default open will not open them and
1222
 
            # they may not be initializable.
1223
 
            return
1224
 
        t = self.get_transport()
1225
 
        made_control = self.bzrdir_format.initialize(t.base)
1226
 
        made_repo = made_control.create_repository()
1227
 
        opened_repo = made_control.open_repository()
1228
 
        self.assertEqual(made_control, opened_repo.bzrdir)
1229
 
        self.assertIsInstance(opened_repo, made_repo.__class__)
1230
 
        self.assertIsInstance(opened_repo._format, made_repo._format.__class__)
1231
 
 
1232
 
    def test_create_workingtree(self):
1233
 
        # a bzrdir can construct a working tree for itself.
1234
 
        if not self.bzrdir_format.is_supported():
1235
 
            # unsupported formats are not loopback testable
1236
 
            # because the default open will not open them and
1237
 
            # they may not be initializable.
1238
 
            return
1239
 
        t = self.get_transport()
1240
 
        made_control = self.bzrdir_format.initialize(t.base)
1241
 
        made_repo = made_control.create_repository()
1242
 
        made_branch = made_control.create_branch()
1243
 
        made_tree = self.createWorkingTreeOrSkip(made_control)
1244
 
        self.assertIsInstance(made_tree, workingtree.WorkingTree)
1245
 
        self.assertEqual(made_control, made_tree.bzrdir)
1246
 
 
1247
 
    def test_create_workingtree_revision(self):
1248
 
        # a bzrdir can construct a working tree for itself @ a specific revision.
1249
 
        t = self.get_transport()
1250
 
        source = self.make_branch_and_tree('source')
1251
 
        source.commit('a', rev_id='a', allow_pointless=True)
1252
 
        source.commit('b', rev_id='b', allow_pointless=True)
1253
 
        t.mkdir('new')
1254
 
        t_new = t.clone('new')
1255
 
        made_control = self.bzrdir_format.initialize_on_transport(t_new)
1256
 
        source.branch.repository.clone(made_control)
1257
 
        source.branch.clone(made_control)
1258
 
        try:
1259
 
            made_tree = made_control.create_workingtree(revision_id='a')
1260
 
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
1261
 
            raise TestSkipped("Can't make working tree on transport %r" % t)
1262
 
        self.assertEqual(['a'], made_tree.get_parent_ids())
1263
 
 
1264
 
    def test_open_workingtree(self):
1265
 
        if not self.bzrdir_format.is_supported():
1266
 
            # unsupported formats are not loopback testable
1267
 
            # because the default open will not open them and
1268
 
            # they may not be initializable.
1269
 
            return
1270
 
        # this has to be tested with local access as we still support creating
1271
 
        # format 6 bzrdirs
1272
 
        t = self.get_transport()
1273
 
        try:
1274
 
            made_control = self.bzrdir_format.initialize(t.base)
1275
 
            made_repo = made_control.create_repository()
1276
 
            made_branch = made_control.create_branch()
1277
 
            made_tree = made_control.create_workingtree()
1278
 
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
1279
 
            raise TestSkipped("Can't initialize %r on transport %r"
1280
 
                              % (self.bzrdir_format, t))
1281
 
        opened_tree = made_control.open_workingtree()
1282
 
        self.assertEqual(made_control, opened_tree.bzrdir)
1283
 
        self.assertIsInstance(opened_tree, made_tree.__class__)
1284
 
        self.assertIsInstance(opened_tree._format, made_tree._format.__class__)
1285
 
 
1286
 
    def test_get_selected_branch(self):
1287
 
        # The segment parameters are accessible from the root transport
1288
 
        # if a URL with segment parameters is opened.
1289
 
        if not self.bzrdir_format.is_supported():
1290
 
            # unsupported formats are not loopback testable
1291
 
            # because the default open will not open them and
1292
 
            # they may not be initializable.
1293
 
            return
1294
 
        t = self.get_transport()
1295
 
        try:
1296
 
            made_control = self.bzrdir_format.initialize(t.base)
1297
 
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
1298
 
            raise TestSkipped("Can't initialize %r on transport %r"
1299
 
                              % (self.bzrdir_format, t))
1300
 
        dir = bzrdir.BzrDir.open(t.base+",branch=foo")
1301
 
        self.assertEquals({"branch": "foo"},
1302
 
            dir.user_transport.get_segment_parameters())
1303
 
        self.assertEquals("foo", dir._get_selected_branch())
1304
 
 
1305
 
    def test_get_selected_branch_none_selected(self):
1306
 
        # _get_selected_branch defaults to None
1307
 
        if not self.bzrdir_format.is_supported():
1308
 
            # unsupported formats are not loopback testable
1309
 
            # because the default open will not open them and
1310
 
            # they may not be initializable.
1311
 
            return
1312
 
        t = self.get_transport()
1313
 
        try:
1314
 
            made_control = self.bzrdir_format.initialize(t.base)
1315
 
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
1316
 
            raise TestSkipped("Can't initialize %r on transport %r"
1317
 
                              % (self.bzrdir_format, t))
1318
 
        dir = bzrdir.BzrDir.open(t.base)
1319
 
        self.assertIs(None, dir._get_selected_branch())
1320
 
 
1321
 
    def test_root_transport(self):
1322
 
        dir = self.make_bzrdir('.')
1323
 
        self.assertEqual(dir.root_transport.base,
1324
 
                         self.get_transport().base)
1325
 
 
1326
 
    def test_find_repository_no_repo_under_standalone_branch(self):
1327
 
        # finding a repo stops at standalone branches even if there is a
1328
 
        # higher repository available.
1329
 
        try:
1330
 
            repo = self.make_repository('.', shared=True)
1331
 
        except errors.IncompatibleFormat:
1332
 
            # need a shared repository to test this.
1333
 
            return
1334
 
        url = self.get_url('intermediate')
1335
 
        t = self.get_transport()
1336
 
        t.mkdir('intermediate')
1337
 
        t.mkdir('intermediate/child')
1338
 
        made_control = self.bzrdir_format.initialize(url)
1339
 
        made_control.create_repository()
1340
 
        innermost_control = self.bzrdir_format.initialize(
1341
 
            self.get_url('intermediate/child'))
1342
 
        try:
1343
 
            child_repo = innermost_control.open_repository()
1344
 
            # if there is a repository, then the format cannot ever hit this
1345
 
            # code path.
1346
 
            return
1347
 
        except errors.NoRepositoryPresent:
1348
 
            pass
1349
 
        self.assertRaises(errors.NoRepositoryPresent,
1350
 
                          innermost_control.find_repository)
1351
 
 
1352
 
    def test_find_repository_containing_shared_repository(self):
1353
 
        # find repo inside a shared repo with an empty control dir
1354
 
        # returns the shared repo.
1355
 
        try:
1356
 
            repo = self.make_repository('.', shared=True)
1357
 
        except errors.IncompatibleFormat:
1358
 
            # need a shared repository to test this.
1359
 
            return
1360
 
        url = self.get_url('childbzrdir')
1361
 
        self.get_transport().mkdir('childbzrdir')
1362
 
        made_control = self.bzrdir_format.initialize(url)
1363
 
        try:
1364
 
            child_repo = made_control.open_repository()
1365
 
            # if there is a repository, then the format cannot ever hit this
1366
 
            # code path.
1367
 
            return
1368
 
        except errors.NoRepositoryPresent:
1369
 
            pass
1370
 
        found_repo = made_control.find_repository()
1371
 
        self.assertEqual(repo.bzrdir.root_transport.base,
1372
 
                         found_repo.bzrdir.root_transport.base)
1373
 
 
1374
 
    def test_find_repository_standalone_with_containing_shared_repository(self):
1375
 
        # find repo inside a standalone repo inside a shared repo finds the standalone repo
1376
 
        try:
1377
 
            containing_repo = self.make_repository('.', shared=True)
1378
 
        except errors.IncompatibleFormat:
1379
 
            # need a shared repository to test this.
1380
 
            return
1381
 
        child_repo = self.make_repository('childrepo')
1382
 
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
1383
 
        found_repo = opened_control.find_repository()
1384
 
        self.assertEqual(child_repo.bzrdir.root_transport.base,
1385
 
                         found_repo.bzrdir.root_transport.base)
1386
 
 
1387
 
    def test_find_repository_shared_within_shared_repository(self):
1388
 
        # find repo at a shared repo inside a shared repo finds the inner repo
1389
 
        try:
1390
 
            containing_repo = self.make_repository('.', shared=True)
1391
 
        except errors.IncompatibleFormat:
1392
 
            # need a shared repository to test this.
1393
 
            return
1394
 
        url = self.get_url('childrepo')
1395
 
        self.get_transport().mkdir('childrepo')
1396
 
        child_control = self.bzrdir_format.initialize(url)
1397
 
        child_repo = child_control.create_repository(shared=True)
1398
 
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
1399
 
        found_repo = opened_control.find_repository()
1400
 
        self.assertEqual(child_repo.bzrdir.root_transport.base,
1401
 
                         found_repo.bzrdir.root_transport.base)
1402
 
        self.assertNotEqual(child_repo.bzrdir.root_transport.base,
1403
 
                            containing_repo.bzrdir.root_transport.base)
1404
 
 
1405
 
    def test_find_repository_with_nested_dirs_works(self):
1406
 
        # find repo inside a bzrdir inside a bzrdir inside a shared repo
1407
 
        # finds the outer shared repo.
1408
 
        try:
1409
 
            repo = self.make_repository('.', shared=True)
1410
 
        except errors.IncompatibleFormat:
1411
 
            # need a shared repository to test this.
1412
 
            return
1413
 
        url = self.get_url('intermediate')
1414
 
        t = self.get_transport()
1415
 
        t.mkdir('intermediate')
1416
 
        t.mkdir('intermediate/child')
1417
 
        made_control = self.bzrdir_format.initialize(url)
1418
 
        try:
1419
 
            child_repo = made_control.open_repository()
1420
 
            # if there is a repository, then the format cannot ever hit this
1421
 
            # code path.
1422
 
            return
1423
 
        except errors.NoRepositoryPresent:
1424
 
            pass
1425
 
        innermost_control = self.bzrdir_format.initialize(
1426
 
            self.get_url('intermediate/child'))
1427
 
        try:
1428
 
            child_repo = innermost_control.open_repository()
1429
 
            # if there is a repository, then the format cannot ever hit this
1430
 
            # code path.
1431
 
            return
1432
 
        except errors.NoRepositoryPresent:
1433
 
            pass
1434
 
        found_repo = innermost_control.find_repository()
1435
 
        self.assertEqual(repo.bzrdir.root_transport.base,
1436
 
                         found_repo.bzrdir.root_transport.base)
1437
 
 
1438
 
    def test_can_and_needs_format_conversion(self):
1439
 
        # check that we can ask an instance if its upgradable
1440
 
        dir = self.make_bzrdir('.')
1441
 
        if dir.can_convert_format():
1442
 
            # if its default updatable there must be an updater
1443
 
            # (we force the latest known format as downgrades may not be
1444
 
            # available
1445
 
            self.assertTrue(isinstance(dir._format.get_converter(
1446
 
                format=dir._format), controldir.Converter))
1447
 
        dir.needs_format_conversion(
1448
 
            controldir.ControlDirFormat.get_default_format())
1449
 
 
1450
 
    def test_backup_copies_existing(self):
1451
 
        tree = self.make_branch_and_tree('test')
1452
 
        self.build_tree(['test/a'])
1453
 
        tree.add(['a'], ['a-id'])
1454
 
        tree.commit('some data to be copied.')
1455
 
        old_url, new_url = tree.bzrdir.backup_bzrdir()
1456
 
        old_path = urlutils.local_path_from_url(old_url)
1457
 
        new_path = urlutils.local_path_from_url(new_url)
1458
 
        self.assertPathExists(old_path)
1459
 
        self.assertPathExists(new_path)
1460
 
        for (((dir_relpath1, _), entries1),
1461
 
             ((dir_relpath2, _), entries2)) in izip(
1462
 
                osutils.walkdirs(old_path),
1463
 
                osutils.walkdirs(new_path)):
1464
 
            self.assertEquals(dir_relpath1, dir_relpath2)
1465
 
            for f1, f2 in zip(entries1, entries2):
1466
 
                self.assertEquals(f1[0], f2[0])
1467
 
                self.assertEquals(f1[2], f2[2])
1468
 
                if f1[2] == "file":
1469
 
                    osutils.compare_files(open(f1[4]), open(f2[4]))
1470
 
 
1471
 
    def test_upgrade_new_instance(self):
1472
 
        """Does an available updater work?"""
1473
 
        dir = self.make_bzrdir('.')
1474
 
        # for now, upgrade is not ready for partial bzrdirs.
1475
 
        dir.create_repository()
1476
 
        dir.create_branch()
1477
 
        self.createWorkingTreeOrSkip(dir)
1478
 
        if dir.can_convert_format():
1479
 
            # if its default updatable there must be an updater
1480
 
            # (we force the latest known format as downgrades may not be
1481
 
            # available
1482
 
            pb = ui.ui_factory.nested_progress_bar()
1483
 
            try:
1484
 
                dir._format.get_converter(format=dir._format).convert(dir, pb)
1485
 
            finally:
1486
 
                pb.finished()
1487
 
            # and it should pass 'check' now.
1488
 
            check.check_dwim(self.get_url('.'), False, True, True)
1489
 
 
1490
 
    def test_format_description(self):
1491
 
        dir = self.make_bzrdir('.')
1492
 
        text = dir._format.get_format_description()
1493
 
        self.assertTrue(len(text))
1494
 
 
1495
 
 
1496
 
class TestBreakLock(TestCaseWithControlDir):
1497
 
 
1498
 
    def test_break_lock_empty(self):
1499
 
        # break lock on an empty bzrdir should work silently.
1500
 
        dir = self.make_bzrdir('.')
1501
 
        try:
1502
 
            dir.break_lock()
1503
 
        except NotImplementedError:
1504
 
            pass
1505
 
 
1506
 
    def test_break_lock_repository(self):
1507
 
        # break lock with just a repo should unlock the repo.
1508
 
        repo = self.make_repository('.')
1509
 
        repo.lock_write()
1510
 
        lock_repo = repo.bzrdir.open_repository()
1511
 
        if not lock_repo.get_physical_lock_status():
1512
 
            # This bzrdir's default repository does not physically lock things
1513
 
            # and thus this interaction cannot be tested at the interface
1514
 
            # level.
1515
 
            repo.unlock()
1516
 
            return
1517
 
        # only one yes needed here: it should only be unlocking
1518
 
        # the repo
1519
 
        bzrlib.ui.ui_factory = CannedInputUIFactory([True])
1520
 
        try:
1521
 
            repo.bzrdir.break_lock()
1522
 
        except NotImplementedError:
1523
 
            # this bzrdir does not implement break_lock - so we cant test it.
1524
 
            repo.unlock()
1525
 
            return
1526
 
        lock_repo.lock_write()
1527
 
        lock_repo.unlock()
1528
 
        self.assertRaises(errors.LockBroken, repo.unlock)
1529
 
 
1530
 
    def test_break_lock_branch(self):
1531
 
        # break lock with just a repo should unlock the branch.
1532
 
        # and not directly try the repository.
1533
 
        # we test this by making a branch reference to a branch
1534
 
        # and repository in another bzrdir
1535
 
        # for pre-metadir formats this will fail, thats ok.
1536
 
        master = self.make_branch('branch')
1537
 
        thisdir = self.make_bzrdir('this')
1538
 
        try:
1539
 
            bzrlib.branch.BranchReferenceFormat().initialize(
1540
 
                thisdir, target_branch=master)
1541
 
        except errors.IncompatibleFormat:
1542
 
            return
1543
 
        unused_repo = thisdir.create_repository()
1544
 
        master.lock_write()
1545
 
        unused_repo.lock_write()
1546
 
        try:
1547
 
            # two yes's : branch and repository. If the repo in this
1548
 
            # dir is inappropriately accessed, 3 will be needed, and
1549
 
            # we'll see that because the stream will be fully consumed
1550
 
            bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
1551
 
            # determine if the repository will have been locked;
1552
 
            this_repo_locked = \
1553
 
                thisdir.open_repository().get_physical_lock_status()
1554
 
            master.bzrdir.break_lock()
1555
 
            if this_repo_locked:
1556
 
                # only two ys should have been read
1557
 
                self.assertEqual([True],
1558
 
                    bzrlib.ui.ui_factory.responses)
1559
 
            else:
1560
 
                # only one y should have been read
1561
 
                self.assertEqual([True, True],
1562
 
                    bzrlib.ui.ui_factory.responses)
1563
 
            # we should be able to lock a newly opened branch now
1564
 
            branch = master.bzrdir.open_branch()
1565
 
            branch.lock_write()
1566
 
            branch.unlock()
1567
 
            if this_repo_locked:
1568
 
                # we should not be able to lock the repository in thisdir as
1569
 
                # its still held by the explicit lock we took, and the break
1570
 
                # lock should not have touched it.
1571
 
                repo = thisdir.open_repository()
1572
 
                self.assertRaises(errors.LockContention, repo.lock_write)
1573
 
        finally:
1574
 
            unused_repo.unlock()
1575
 
        self.assertRaises(errors.LockBroken, master.unlock)
1576
 
 
1577
 
    def test_break_lock_tree(self):
1578
 
        # break lock with a tree should unlock the tree but not try the
1579
 
        # branch explicitly. However this is very hard to test for as we
1580
 
        # dont have a tree reference class, nor is one needed;
1581
 
        # the worst case if this code unlocks twice is an extra question
1582
 
        # being asked.
1583
 
        tree = self.make_branch_and_tree('.')
1584
 
        tree.lock_write()
1585
 
        # three yes's : tree, branch and repository.
1586
 
        bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
1587
 
        try:
1588
 
            tree.bzrdir.break_lock()
1589
 
        except (NotImplementedError, errors.LockActive):
1590
 
            # bzrdir does not support break_lock
1591
 
            # or one of the locked objects (currently only tree does this)
1592
 
            # raised a LockActive because we do still have a live locked
1593
 
            # object.
1594
 
            tree.unlock()
1595
 
            return
1596
 
        self.assertEqual([True],
1597
 
                bzrlib.ui.ui_factory.responses)
1598
 
        lock_tree = tree.bzrdir.open_workingtree()
1599
 
        lock_tree.lock_write()
1600
 
        lock_tree.unlock()
1601
 
        self.assertRaises(errors.LockBroken, tree.unlock)
1602
 
 
1603
 
 
1604
 
class TestTransportConfig(TestCaseWithControlDir):
1605
 
 
1606
 
    def test_get_config(self):
1607
 
        my_dir = self.make_bzrdir('.')
1608
 
        config = my_dir.get_config()
1609
 
        try:
1610
 
            config.set_default_stack_on('http://example.com')
1611
 
        except errors.BzrError, e:
1612
 
            if 'Cannot set config' in str(e):
1613
 
                self.assertFalse(
1614
 
                    isinstance(my_dir, (bzrdir.BzrDirMeta1, RemoteBzrDir)),
1615
 
                    "%r should support configs" % my_dir)
1616
 
                raise TestNotApplicable(
1617
 
                    'This BzrDir format does not support configs.')
1618
 
            else:
1619
 
                raise
1620
 
        self.assertEqual('http://example.com', config.get_default_stack_on())
1621
 
        my_dir2 = bzrdir.BzrDir.open(self.get_url('.'))
1622
 
        config2 = my_dir2.get_config()
1623
 
        self.assertEqual('http://example.com', config2.get_default_stack_on())
1624
 
 
1625
 
 
1626
 
class ChrootedControlDirTests(ChrootedTestCase):
1627
 
 
1628
 
    def test_find_repository_no_repository(self):
1629
 
        # loopback test to check the current format fails to find a
1630
 
        # share repository correctly.
1631
 
        if not self.bzrdir_format.is_supported():
1632
 
            # unsupported formats are not loopback testable
1633
 
            # because the default open will not open them and
1634
 
            # they may not be initializable.
1635
 
            return
1636
 
        # supported formats must be able to init and open
1637
 
        # - do the vfs initialisation over the basic vfs transport
1638
 
        # XXX: TODO this should become a 'bzrdirlocation' api call.
1639
 
        url = self.get_vfs_only_url('subdir')
1640
 
        transport.get_transport(self.get_vfs_only_url()).mkdir('subdir')
 
10
# Copyright (C) 2006-2011 Canonical Ltd
 
11
#
 
12
# This program is free software; you can redistribute it and/or modify
 
13
# it under the terms of the GNU General Public License as published by
 
14
# the Free Software Foundation; either version 2 of the License, or
 
15
# (at your option) any later version.
 
16
#
 
17
# This program is distributed in the hope that it will be useful,
 
18
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
19
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
20
# GNU General Public License for more details.
 
21
#
 
22
# You should have received a copy of the GNU General Public License
 
23
# along with this program; if not, write to the Free Software
 
24
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
25
 
 
26
"""Tests for control directory implementations - tests a controldir format."""
 
27
 
 
28
from itertools import izip
 
29
 
 
30
import bzrlib.branch
 
31
from bzrlib import (
 
32
    bzrdir,
 
33
    check,
 
34
    controldir,
 
35
    errors,
 
36
    gpg,
 
37
    osutils,
 
38
    transport,
 
39
    ui,
 
40
    urlutils,
 
41
    workingtree,
 
42
    )
 
43
import bzrlib.revision
 
44
from bzrlib.tests import (
 
45
    fixtures,
 
46
    ChrootedTestCase,
 
47
    TestNotApplicable,
 
48
    TestSkipped,
 
49
    )
 
50
from bzrlib.tests.per_controldir import TestCaseWithControlDir
 
51
from bzrlib.transport.local import LocalTransport
 
52
from bzrlib.ui import (
 
53
    CannedInputUIFactory,
 
54
    )
 
55
from bzrlib.remote import (
 
56
    RemoteBzrDir,
 
57
    RemoteBzrDirFormat,
 
58
    RemoteRepository,
 
59
    )
 
60
 
 
61
 
 
62
class TestControlDir(TestCaseWithControlDir):
 
63
 
 
64
    def skipIfNoWorkingTree(self, a_bzrdir):
 
65
        """Raises TestSkipped if a_bzrdir doesn't have a working tree.
 
66
 
 
67
        If the bzrdir does have a workingtree, this is a no-op.
 
68
        """
 
69
        try:
 
70
            a_bzrdir.open_workingtree()
 
71
        except (errors.NotLocalUrl, errors.NoWorkingTree):
 
72
            raise TestSkipped("bzrdir on transport %r has no working tree"
 
73
                              % a_bzrdir.transport)
 
74
 
 
75
    def openWorkingTreeIfLocal(self, a_bzrdir):
 
76
        """If a_bzrdir is on a local transport, call open_workingtree() on it.
 
77
        """
 
78
        if not isinstance(a_bzrdir.root_transport, LocalTransport):
 
79
            # it's not local, but that's ok
 
80
            return
 
81
        a_bzrdir.open_workingtree()
 
82
 
 
83
    def createWorkingTreeOrSkip(self, a_bzrdir):
 
84
        """Create a working tree on a_bzrdir, or raise TestSkipped.
 
85
 
 
86
        A simple wrapper for create_workingtree that translates NotLocalUrl into
 
87
        TestSkipped.  Returns the newly created working tree.
 
88
        """
 
89
        try:
 
90
            return a_bzrdir.create_workingtree()
 
91
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
92
            raise TestSkipped("cannot make working tree with transport %r"
 
93
                              % a_bzrdir.transport)
 
94
 
 
95
    def sproutOrSkip(self, from_bzrdir, to_url, revision_id=None,
 
96
                     force_new_repo=False, accelerator_tree=None,
 
97
                     create_tree_if_local=True):
 
98
        """Sprout from_bzrdir into to_url, or raise TestSkipped.
 
99
 
 
100
        A simple wrapper for from_bzrdir.sprout that translates NotLocalUrl into
 
101
        TestSkipped.  Returns the newly sprouted bzrdir.
 
102
        """
 
103
        to_transport = transport.get_transport(to_url)
 
104
        if not isinstance(to_transport, LocalTransport):
 
105
            raise TestSkipped('Cannot sprout to remote bzrdirs.')
 
106
        target = from_bzrdir.sprout(to_url, revision_id=revision_id,
 
107
                                    force_new_repo=force_new_repo,
 
108
                                    possible_transports=[to_transport],
 
109
                                    accelerator_tree=accelerator_tree,
 
110
                                    create_tree_if_local=create_tree_if_local)
 
111
        return target
 
112
 
 
113
    def test_create_null_workingtree(self):
 
114
        dir = self.make_bzrdir('dir1')
 
115
        dir.create_repository()
 
116
        dir.create_branch()
 
117
        try:
 
118
            wt = dir.create_workingtree(revision_id=bzrlib.revision.NULL_REVISION)
 
119
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
120
            raise TestSkipped("cannot make working tree with transport %r"
 
121
                              % dir.transport)
 
122
        self.assertEqual([], wt.get_parent_ids())
 
123
 
 
124
    def test_destroy_workingtree(self):
 
125
        tree = self.make_branch_and_tree('tree')
 
126
        self.build_tree(['tree/file'])
 
127
        tree.add('file')
 
128
        tree.commit('first commit')
 
129
        bzrdir = tree.bzrdir
 
130
        try:
 
131
            bzrdir.destroy_workingtree()
 
132
        except errors.UnsupportedOperation:
 
133
            raise TestSkipped('Format does not support destroying tree')
 
134
        self.assertPathDoesNotExist('tree/file')
 
135
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
 
136
        bzrdir.create_workingtree()
 
137
        self.assertPathExists('tree/file')
 
138
        bzrdir.destroy_workingtree_metadata()
 
139
        self.assertPathExists('tree/file')
 
140
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
 
141
 
 
142
    def test_destroy_branch(self):
 
143
        branch = self.make_branch('branch')
 
144
        bzrdir = branch.bzrdir
 
145
        try:
 
146
            bzrdir.destroy_branch()
 
147
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
 
148
            raise TestNotApplicable('Format does not support destroying branch')
 
149
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
 
150
        bzrdir.create_branch()
 
151
        bzrdir.open_branch()
 
152
 
 
153
    def test_destroy_repository(self):
 
154
        repo = self.make_repository('repository')
 
155
        bzrdir = repo.bzrdir
 
156
        try:
 
157
            bzrdir.destroy_repository()
 
158
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
 
159
            raise TestNotApplicable('Format does not support destroying'
 
160
                                    ' repository')
 
161
        self.assertRaises(errors.NoRepositoryPresent, bzrdir.open_repository)
 
162
        bzrdir.create_repository()
 
163
        bzrdir.open_repository()
 
164
 
 
165
    def test_open_workingtree_raises_no_working_tree(self):
 
166
        """ControlDir.open_workingtree() should raise NoWorkingTree (rather than
 
167
        e.g. NotLocalUrl) if there is no working tree.
 
168
        """
 
169
        dir = self.make_bzrdir('source')
 
170
        vfs_dir = bzrdir.BzrDir.open(self.get_vfs_only_url('source'))
 
171
        if vfs_dir.has_workingtree():
 
172
            # This ControlDir format doesn't support ControlDirs without
 
173
            # working trees, so this test is irrelevant.
 
174
            return
 
175
        self.assertRaises(errors.NoWorkingTree, dir.open_workingtree)
 
176
 
 
177
    def test_clone_bzrdir_repository_under_shared(self):
 
178
        tree = self.make_branch_and_tree('commit_tree')
 
179
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
 
180
        tree.add('foo')
 
181
        tree.commit('revision 1', rev_id='1')
 
182
        dir = self.make_bzrdir('source')
 
183
        repo = dir.create_repository()
 
184
        repo.fetch(tree.branch.repository)
 
185
        self.assertTrue(repo.has_revision('1'))
 
186
        try:
 
187
            self.make_repository('target', shared=True)
 
188
        except errors.IncompatibleFormat:
 
189
            return
 
190
        target = dir.clone(self.get_url('target/child'))
 
191
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
192
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
193
 
 
194
    def test_clone_bzrdir_repository_branch_both_under_shared(self):
 
195
        # Create a shared repository
 
196
        try:
 
197
            shared_repo = self.make_repository('shared', shared=True)
 
198
        except errors.IncompatibleFormat:
 
199
            return
 
200
        # Make a branch, 'commit_tree', and working tree outside of the shared
 
201
        # repository, and commit some revisions to it.
 
202
        tree = self.make_branch_and_tree('commit_tree')
 
203
        self.build_tree(['foo'], transport=tree.bzrdir.root_transport)
 
204
        tree.add('foo')
 
205
        tree.commit('revision 1', rev_id='1')
 
206
        tree.bzrdir.open_branch().generate_revision_history(
 
207
            bzrlib.revision.NULL_REVISION)
 
208
        tree.set_parent_trees([])
 
209
        tree.commit('revision 2', rev_id='2')
 
210
        # Copy the content (i.e. revisions) from the 'commit_tree' branch's
 
211
        # repository into the shared repository.
 
212
        tree.branch.repository.copy_content_into(shared_repo)
 
213
        # Make a branch 'source' inside the shared repository.
 
214
        dir = self.make_bzrdir('shared/source')
 
215
        dir.create_branch()
 
216
        # Clone 'source' to 'target', also inside the shared repository.
 
217
        target = dir.clone(self.get_url('shared/target'))
 
218
        # 'source', 'target', and the shared repo all have distinct bzrdirs.
 
219
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
220
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
221
        # The shared repository will contain revisions from the 'commit_tree'
 
222
        # repository, even revisions that are not part of the history of the
 
223
        # 'commit_tree' branch.
 
224
        self.assertTrue(shared_repo.has_revision('1'))
 
225
 
 
226
    def test_clone_bzrdir_repository_branch_only_source_under_shared(self):
 
227
        try:
 
228
            shared_repo = self.make_repository('shared', shared=True)
 
229
        except errors.IncompatibleFormat:
 
230
            return
 
231
        tree = self.make_branch_and_tree('commit_tree')
 
232
        self.build_tree(['commit_tree/foo'])
 
233
        tree.add('foo')
 
234
        tree.commit('revision 1', rev_id='1')
 
235
        tree.branch.bzrdir.open_branch().generate_revision_history(
 
236
            bzrlib.revision.NULL_REVISION)
 
237
        tree.set_parent_trees([])
 
238
        tree.commit('revision 2', rev_id='2')
 
239
        tree.branch.repository.copy_content_into(shared_repo)
 
240
        if shared_repo.make_working_trees():
 
241
            shared_repo.set_make_working_trees(False)
 
242
            self.assertFalse(shared_repo.make_working_trees())
 
243
        self.assertTrue(shared_repo.has_revision('1'))
 
244
        dir = self.make_bzrdir('shared/source')
 
245
        dir.create_branch()
 
246
        target = dir.clone(self.get_url('target'))
 
247
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
248
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
249
        branch = target.open_branch()
 
250
        self.assertTrue(branch.repository.has_revision('1'))
 
251
        self.assertFalse(branch.repository.make_working_trees())
 
252
        self.assertTrue(branch.repository.is_shared())
 
253
 
 
254
    def test_clone_bzrdir_repository_revision(self):
 
255
        # test for revision limiting, [smoke test, not corner case checks].
 
256
        # make a repository with some revisions,
 
257
        # and clone it with a revision limit.
 
258
        #
 
259
        tree = self.make_branch_and_tree('commit_tree')
 
260
        self.build_tree(['commit_tree/foo'])
 
261
        tree.add('foo')
 
262
        tree.commit('revision 1', rev_id='1')
 
263
        tree.branch.bzrdir.open_branch().generate_revision_history(
 
264
            bzrlib.revision.NULL_REVISION)
 
265
        tree.set_parent_trees([])
 
266
        tree.commit('revision 2', rev_id='2')
 
267
        source = self.make_repository('source')
 
268
        tree.branch.repository.copy_content_into(source)
 
269
        dir = source.bzrdir
 
270
        target = dir.clone(self.get_url('target'), revision_id='2')
 
271
        raise TestSkipped('revision limiting not strict yet')
 
272
 
 
273
    def test_clone_bzrdir_branch_and_repo_fixed_user_id(self):
 
274
        # Bug #430868 is about an email containing '.sig'
 
275
        self.overrideEnv('BZR_EMAIL', 'murphy@host.sighup.org')
 
276
        tree = self.make_branch_and_tree('commit_tree')
 
277
        self.build_tree(['commit_tree/foo'])
 
278
        tree.add('foo')
 
279
        rev1 = tree.commit('revision 1')
 
280
        tree_repo = tree.branch.repository
 
281
        tree_repo.lock_write()
 
282
        tree_repo.start_write_group()
 
283
        tree_repo.sign_revision(rev1, gpg.LoopbackGPGStrategy(None))
 
284
        tree_repo.commit_write_group()
 
285
        tree_repo.unlock()
 
286
        target = self.make_branch('target')
 
287
        tree.branch.repository.copy_content_into(target.repository)
 
288
        tree.branch.copy_content_into(target)
 
289
        self.assertTrue(target.repository.has_revision(rev1))
 
290
        self.assertEqual(
 
291
            tree_repo.get_signature_text(rev1),
 
292
            target.repository.get_signature_text(rev1))
 
293
 
 
294
    def test_clone_bzrdir_branch_and_repo_into_shared_repo(self):
 
295
        # by default cloning into a shared repo uses the shared repo.
 
296
        tree = self.make_branch_and_tree('commit_tree')
 
297
        self.build_tree(['commit_tree/foo'])
 
298
        tree.add('foo')
 
299
        tree.commit('revision 1')
 
300
        source = self.make_branch('source')
 
301
        tree.branch.repository.copy_content_into(source.repository)
 
302
        tree.branch.copy_content_into(source)
 
303
        try:
 
304
            self.make_repository('target', shared=True)
 
305
        except errors.IncompatibleFormat:
 
306
            return
 
307
        dir = source.bzrdir
 
308
        target = dir.clone(self.get_url('target/child'))
 
309
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
310
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
311
        self.assertEqual(source.revision_history(),
 
312
                         target.open_branch().revision_history())
 
313
 
 
314
    def test_clone_bzrdir_branch_revision(self):
 
315
        # test for revision limiting, [smoke test, not corner case checks].
 
316
        # make a branch with some revisions,
 
317
        # and clone it with a revision limit.
 
318
        #
 
319
        tree = self.make_branch_and_tree('commit_tree')
 
320
        self.build_tree(['commit_tree/foo'])
 
321
        tree.add('foo')
 
322
        tree.commit('revision 1', rev_id='1')
 
323
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
324
        source = self.make_branch('source')
 
325
        tree.branch.repository.copy_content_into(source.repository)
 
326
        tree.branch.copy_content_into(source)
 
327
        dir = source.bzrdir
 
328
        target = dir.clone(self.get_url('target'), revision_id='1')
 
329
        self.assertEqual('1', target.open_branch().last_revision())
 
330
 
 
331
    def test_clone_on_transport_preserves_repo_format(self):
 
332
        if self.bzrdir_format == bzrdir.format_registry.make_bzrdir('default'):
 
333
            format = 'knit'
 
334
        else:
 
335
            format = None
 
336
        source_branch = self.make_branch('source', format=format)
 
337
        # Ensure no format data is cached
 
338
        a_dir = bzrlib.branch.Branch.open_from_transport(
 
339
            self.get_transport('source')).bzrdir
 
340
        target_transport = self.get_transport('target')
 
341
        target_bzrdir = a_dir.clone_on_transport(target_transport)
 
342
        target_repo = target_bzrdir.open_repository()
 
343
        source_branch = bzrlib.branch.Branch.open(
 
344
            self.get_vfs_only_url('source'))
 
345
        if isinstance(target_repo, RemoteRepository):
 
346
            target_repo._ensure_real()
 
347
            target_repo = target_repo._real_repository
 
348
        self.assertEqual(target_repo._format, source_branch.repository._format)
 
349
 
 
350
    def test_clone_bzrdir_tree_revision(self):
 
351
        # test for revision limiting, [smoke test, not corner case checks].
 
352
        # make a tree with a revision with a last-revision
 
353
        # and clone it with a revision limit.
 
354
        # This smoke test just checks the revision-id is right. Tree specific
 
355
        # tests will check corner cases.
 
356
        tree = self.make_branch_and_tree('source')
 
357
        self.build_tree(['source/foo'])
 
358
        tree.add('foo')
 
359
        tree.commit('revision 1', rev_id='1')
 
360
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
361
        dir = tree.bzrdir
 
362
        target = dir.clone(self.get_url('target'), revision_id='1')
 
363
        self.skipIfNoWorkingTree(target)
 
364
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
365
 
 
366
    def test_clone_bzrdir_into_notrees_repo(self):
 
367
        """Cloning into a no-trees repo should not create a working tree"""
 
368
        tree = self.make_branch_and_tree('source')
 
369
        self.build_tree(['source/foo'])
 
370
        tree.add('foo')
 
371
        tree.commit('revision 1')
 
372
 
 
373
        try:
 
374
            repo = self.make_repository('repo', shared=True)
 
375
        except errors.IncompatibleFormat:
 
376
            raise TestNotApplicable('must support shared repositories')
 
377
        if repo.make_working_trees():
 
378
            repo.set_make_working_trees(False)
 
379
            self.assertFalse(repo.make_working_trees())
 
380
 
 
381
        dir = tree.bzrdir
 
382
        a_dir = dir.clone(self.get_url('repo/a'))
 
383
        a_dir.open_branch()
 
384
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
385
 
 
386
    def test_clone_respects_stacked(self):
 
387
        branch = self.make_branch('parent')
 
388
        child_transport = self.get_transport('child')
 
389
        child = branch.bzrdir.clone_on_transport(child_transport,
 
390
                                                 stacked_on=branch.base)
 
391
        self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base)
 
392
 
 
393
    def test_get_branch_reference_on_reference(self):
 
394
        """get_branch_reference should return the right url."""
 
395
        referenced_branch = self.make_branch('referenced')
 
396
        dir = self.make_bzrdir('source')
 
397
        try:
 
398
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
399
                target_branch=referenced_branch)
 
400
        except errors.IncompatibleFormat:
 
401
            # this is ok too, not all formats have to support references.
 
402
            return
 
403
        self.assertEqual(referenced_branch.bzrdir.root_transport.abspath('') + '/',
 
404
            dir.get_branch_reference())
 
405
 
 
406
    def test_get_branch_reference_on_non_reference(self):
 
407
        """get_branch_reference should return None for non-reference branches."""
 
408
        branch = self.make_branch('referenced')
 
409
        self.assertEqual(None, branch.bzrdir.get_branch_reference())
 
410
 
 
411
    def test_get_branch_reference_no_branch(self):
 
412
        """get_branch_reference should not mask NotBranchErrors."""
 
413
        dir = self.make_bzrdir('source')
 
414
        if dir.has_branch():
 
415
            # this format does not support branchless bzrdirs.
 
416
            return
 
417
        self.assertRaises(errors.NotBranchError, dir.get_branch_reference)
 
418
 
 
419
    def test_sprout_bzrdir_empty(self):
 
420
        dir = self.make_bzrdir('source')
 
421
        target = dir.sprout(self.get_url('target'))
 
422
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
423
        # creates a new repository branch and tree
 
424
        target.open_repository()
 
425
        target.open_branch()
 
426
        self.openWorkingTreeIfLocal(target)
 
427
 
 
428
    def test_sprout_bzrdir_empty_under_shared_repo(self):
 
429
        # sprouting an empty dir into a repo uses the repo
 
430
        dir = self.make_bzrdir('source')
 
431
        try:
 
432
            self.make_repository('target', shared=True)
 
433
        except errors.IncompatibleFormat:
 
434
            return
 
435
        target = dir.sprout(self.get_url('target/child'))
 
436
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
437
        target.open_branch()
 
438
        try:
 
439
            target.open_workingtree()
 
440
        except errors.NoWorkingTree:
 
441
            # Some bzrdirs can never have working trees.
 
442
            self.assertFalse(target._format.supports_workingtrees)
 
443
 
 
444
    def test_sprout_bzrdir_empty_under_shared_repo_force_new(self):
 
445
        # the force_new_repo parameter should force use of a new repo in an empty
 
446
        # bzrdir's sprout logic
 
447
        dir = self.make_bzrdir('source')
 
448
        try:
 
449
            self.make_repository('target', shared=True)
 
450
        except errors.IncompatibleFormat:
 
451
            return
 
452
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
453
        target.open_repository()
 
454
        target.open_branch()
 
455
        self.openWorkingTreeIfLocal(target)
 
456
 
 
457
    def test_sprout_bzrdir_with_repository_to_shared(self):
 
458
        tree = self.make_branch_and_tree('commit_tree')
 
459
        self.build_tree(['commit_tree/foo'])
 
460
        tree.add('foo')
 
461
        tree.commit('revision 1', rev_id='1')
 
462
        tree.bzrdir.open_branch().generate_revision_history(
 
463
            bzrlib.revision.NULL_REVISION)
 
464
        tree.set_parent_trees([])
 
465
        tree.commit('revision 2', rev_id='2')
 
466
        source = self.make_repository('source')
 
467
        tree.branch.repository.copy_content_into(source)
 
468
        dir = source.bzrdir
 
469
        try:
 
470
            shared_repo = self.make_repository('target', shared=True)
 
471
        except errors.IncompatibleFormat:
 
472
            return
 
473
        target = dir.sprout(self.get_url('target/child'))
 
474
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
475
        self.assertTrue(shared_repo.has_revision('1'))
 
476
 
 
477
    def test_sprout_bzrdir_repository_branch_both_under_shared(self):
 
478
        try:
 
479
            shared_repo = self.make_repository('shared', shared=True)
 
480
        except errors.IncompatibleFormat:
 
481
            return
 
482
        tree = self.make_branch_and_tree('commit_tree')
 
483
        self.build_tree(['commit_tree/foo'])
 
484
        tree.add('foo')
 
485
        tree.commit('revision 1', rev_id='1')
 
486
        tree.bzrdir.open_branch().generate_revision_history(
 
487
            bzrlib.revision.NULL_REVISION)
 
488
        tree.set_parent_trees([])
 
489
        tree.commit('revision 2', rev_id='2')
 
490
        tree.branch.repository.copy_content_into(shared_repo)
 
491
        dir = self.make_bzrdir('shared/source')
 
492
        dir.create_branch()
 
493
        target = dir.sprout(self.get_url('shared/target'))
 
494
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
495
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
496
        self.assertTrue(shared_repo.has_revision('1'))
 
497
 
 
498
    def test_sprout_bzrdir_repository_branch_only_source_under_shared(self):
 
499
        try:
 
500
            shared_repo = self.make_repository('shared', shared=True)
 
501
        except errors.IncompatibleFormat:
 
502
            return
 
503
        tree = self.make_branch_and_tree('commit_tree')
 
504
        self.build_tree(['commit_tree/foo'])
 
505
        tree.add('foo')
 
506
        tree.commit('revision 1', rev_id='1')
 
507
        tree.bzrdir.open_branch().generate_revision_history(
 
508
            bzrlib.revision.NULL_REVISION)
 
509
        tree.set_parent_trees([])
 
510
        tree.commit('revision 2', rev_id='2')
 
511
        tree.branch.repository.copy_content_into(shared_repo)
 
512
        if shared_repo.make_working_trees():
 
513
            shared_repo.set_make_working_trees(False)
 
514
            self.assertFalse(shared_repo.make_working_trees())
 
515
        self.assertTrue(shared_repo.has_revision('1'))
 
516
        dir = self.make_bzrdir('shared/source')
 
517
        dir.create_branch()
 
518
        target = dir.sprout(self.get_url('target'))
 
519
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
520
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
521
        branch = target.open_branch()
 
522
        # The sprouted bzrdir has a branch, so only revisions referenced by
 
523
        # that branch are copied, rather than the whole repository.  It's an
 
524
        # empty branch, so none are copied.
 
525
        self.assertEqual([], branch.repository.all_revision_ids())
 
526
        if branch.bzrdir._format.supports_workingtrees:
 
527
            self.assertTrue(branch.repository.make_working_trees())
 
528
        self.assertFalse(branch.repository.is_shared())
 
529
 
 
530
    def test_sprout_bzrdir_repository_under_shared_force_new_repo(self):
 
531
        tree = self.make_branch_and_tree('commit_tree')
 
532
        self.build_tree(['commit_tree/foo'])
 
533
        tree.add('foo')
 
534
        tree.commit('revision 1', rev_id='1')
 
535
        tree.bzrdir.open_branch().generate_revision_history(
 
536
            bzrlib.revision.NULL_REVISION)
 
537
        tree.set_parent_trees([])
 
538
        tree.commit('revision 2', rev_id='2')
 
539
        source = self.make_repository('source')
 
540
        tree.branch.repository.copy_content_into(source)
 
541
        dir = source.bzrdir
 
542
        try:
 
543
            shared_repo = self.make_repository('target', shared=True)
 
544
        except errors.IncompatibleFormat:
 
545
            return
 
546
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
547
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
548
        self.assertFalse(shared_repo.has_revision('1'))
 
549
 
 
550
    def test_sprout_bzrdir_repository_revision(self):
 
551
        # test for revision limiting, [smoke test, not corner case checks].
 
552
        # make a repository with some revisions,
 
553
        # and sprout it with a revision limit.
 
554
        #
 
555
        tree = self.make_branch_and_tree('commit_tree')
 
556
        self.build_tree(['commit_tree/foo'])
 
557
        tree.add('foo')
 
558
        tree.commit('revision 1', rev_id='1')
 
559
        br = tree.bzrdir.open_branch()
 
560
        br.set_last_revision_info(0, bzrlib.revision.NULL_REVISION)
 
561
        tree.set_parent_trees([])
 
562
        tree.commit('revision 2', rev_id='2')
 
563
        source = self.make_repository('source')
 
564
        tree.branch.repository.copy_content_into(source)
 
565
        dir = source.bzrdir
 
566
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='2')
 
567
        raise TestSkipped('revision limiting not strict yet')
 
568
 
 
569
    def test_sprout_bzrdir_branch_and_repo_shared(self):
 
570
        # sprouting a branch with a repo into a shared repo uses the shared
 
571
        # repo
 
572
        tree = self.make_branch_and_tree('commit_tree')
 
573
        self.build_tree(['commit_tree/foo'])
 
574
        tree.add('foo')
 
575
        tree.commit('revision 1', rev_id='1')
 
576
        source = self.make_branch('source')
 
577
        tree.branch.repository.copy_content_into(source.repository)
 
578
        tree.bzrdir.open_branch().copy_content_into(source)
 
579
        dir = source.bzrdir
 
580
        try:
 
581
            shared_repo = self.make_repository('target', shared=True)
 
582
        except errors.IncompatibleFormat:
 
583
            return
 
584
        target = dir.sprout(self.get_url('target/child'))
 
585
        self.assertTrue(shared_repo.has_revision('1'))
 
586
 
 
587
    def test_sprout_bzrdir_branch_and_repo_shared_force_new_repo(self):
 
588
        # sprouting a branch with a repo into a shared repo uses the shared
 
589
        # repo
 
590
        tree = self.make_branch_and_tree('commit_tree')
 
591
        self.build_tree(['commit_tree/foo'])
 
592
        tree.add('foo')
 
593
        tree.commit('revision 1', rev_id='1')
 
594
        source = self.make_branch('source')
 
595
        tree.branch.repository.copy_content_into(source.repository)
 
596
        tree.bzrdir.open_branch().copy_content_into(source)
 
597
        dir = source.bzrdir
 
598
        try:
 
599
            shared_repo = self.make_repository('target', shared=True)
 
600
        except errors.IncompatibleFormat:
 
601
            return
 
602
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
603
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
604
        self.assertFalse(shared_repo.has_revision('1'))
 
605
 
 
606
    def test_sprout_bzrdir_branch_reference(self):
 
607
        # sprouting should create a repository if needed and a sprouted branch.
 
608
        referenced_branch = self.make_branch('referenced')
 
609
        dir = self.make_bzrdir('source')
 
610
        try:
 
611
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
612
                target_branch=referenced_branch)
 
613
        except errors.IncompatibleFormat:
 
614
            # this is ok too, not all formats have to support references.
 
615
            return
 
616
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
617
        target = dir.sprout(self.get_url('target'))
 
618
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
619
        # we want target to have a branch that is in-place.
 
620
        self.assertEqual(target, target.open_branch().bzrdir)
 
621
        # and as we dont support repositories being detached yet, a repo in
 
622
        # place
 
623
        target.open_repository()
 
624
 
 
625
    def test_sprout_bzrdir_branch_reference_shared(self):
 
626
        # sprouting should create a repository if needed and a sprouted branch.
 
627
        referenced_tree = self.make_branch_and_tree('referenced')
 
628
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
 
629
        dir = self.make_bzrdir('source')
 
630
        try:
 
631
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
632
                target_branch=referenced_tree.branch)
 
633
        except errors.IncompatibleFormat:
 
634
            # this is ok too, not all formats have to support references.
 
635
            return
 
636
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
637
        try:
 
638
            shared_repo = self.make_repository('target', shared=True)
 
639
        except errors.IncompatibleFormat:
 
640
            return
 
641
        target = dir.sprout(self.get_url('target/child'))
 
642
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
643
        # we want target to have a branch that is in-place.
 
644
        self.assertEqual(target, target.open_branch().bzrdir)
 
645
        # and we want no repository as the target is shared
 
646
        self.assertRaises(errors.NoRepositoryPresent,
 
647
                          target.open_repository)
 
648
        # and we want revision '1' in the shared repo
 
649
        self.assertTrue(shared_repo.has_revision('1'))
 
650
 
 
651
    def test_sprout_bzrdir_branch_reference_shared_force_new_repo(self):
 
652
        # sprouting should create a repository if needed and a sprouted branch.
 
653
        referenced_tree = self.make_branch_and_tree('referenced')
 
654
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
 
655
        dir = self.make_bzrdir('source')
 
656
        try:
 
657
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
658
                target_branch=referenced_tree.branch)
 
659
        except errors.IncompatibleFormat:
 
660
            # this is ok too, not all formats have to support references.
 
661
            return
 
662
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
663
        try:
 
664
            shared_repo = self.make_repository('target', shared=True)
 
665
        except errors.IncompatibleFormat:
 
666
            return
 
667
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
668
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
669
        # we want target to have a branch that is in-place.
 
670
        self.assertEqual(target, target.open_branch().bzrdir)
 
671
        # and we want revision '1' in the new repo
 
672
        self.assertTrue(target.open_repository().has_revision('1'))
 
673
        # but not the shared one
 
674
        self.assertFalse(shared_repo.has_revision('1'))
 
675
 
 
676
    def test_sprout_bzrdir_branch_revision(self):
 
677
        # test for revision limiting, [smoke test, not corner case checks].
 
678
        # make a repository with some revisions,
 
679
        # and sprout it with a revision limit.
 
680
        #
 
681
        tree = self.make_branch_and_tree('commit_tree')
 
682
        self.build_tree(['commit_tree/foo'])
 
683
        tree.add('foo')
 
684
        tree.commit('revision 1', rev_id='1')
 
685
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
686
        source = self.make_branch('source')
 
687
        tree.branch.repository.copy_content_into(source.repository)
 
688
        tree.bzrdir.open_branch().copy_content_into(source)
 
689
        dir = source.bzrdir
 
690
        target = dir.sprout(self.get_url('target'), revision_id='1')
 
691
        self.assertEqual('1', target.open_branch().last_revision())
 
692
 
 
693
    def test_sprout_bzrdir_branch_with_tags(self):
 
694
        # when sprouting a branch all revisions named in the tags are copied
 
695
        # too.
 
696
        builder = self.make_branch_builder('source')
 
697
        source = fixtures.build_branch_with_non_ancestral_rev(builder)
 
698
        try:
 
699
            source.tags.set_tag('tag-a', 'rev-2')
 
700
        except errors.TagsNotSupported:
 
701
            raise TestNotApplicable('Branch format does not support tags.')
 
702
        source.get_config().set_user_option('branch.fetch_tags', 'True')
 
703
        # Now source has a tag not in its ancestry.  Sprout its controldir.
 
704
        dir = source.bzrdir
 
705
        target = dir.sprout(self.get_url('target'))
 
706
        # The tag is present, and so is its revision.
 
707
        new_branch = target.open_branch()
 
708
        self.assertEqual('rev-2', new_branch.tags.lookup_tag('tag-a'))
 
709
        new_branch.repository.get_revision('rev-2')
 
710
 
 
711
    def test_sprout_bzrdir_branch_with_absent_tag(self):
 
712
        # tags referencing absent revisions are copied (and those absent
 
713
        # revisions do not prevent the sprout.)
 
714
        builder = self.make_branch_builder('source')
 
715
        builder.build_commit(message="Rev 1", rev_id='rev-1')
 
716
        source = builder.get_branch()
 
717
        try:
 
718
            source.tags.set_tag('tag-a', 'missing-rev')
 
719
        except errors.TagsNotSupported:
 
720
            raise TestNotApplicable('Branch format does not support tags.')
 
721
        # Now source has a tag pointing to an absent revision.  Sprout its
 
722
        # controldir.
 
723
        dir = source.bzrdir
 
724
        target = dir.sprout(self.get_url('target'))
 
725
        # The tag is present in the target
 
726
        new_branch = target.open_branch()
 
727
        self.assertEqual('missing-rev', new_branch.tags.lookup_tag('tag-a'))
 
728
 
 
729
    def test_sprout_bzrdir_passing_source_branch_with_absent_tag(self):
 
730
        # tags referencing absent revisions are copied (and those absent
 
731
        # revisions do not prevent the sprout.)
 
732
        builder = self.make_branch_builder('source')
 
733
        builder.build_commit(message="Rev 1", rev_id='rev-1')
 
734
        source = builder.get_branch()
 
735
        try:
 
736
            source.tags.set_tag('tag-a', 'missing-rev')
 
737
        except errors.TagsNotSupported:
 
738
            raise TestNotApplicable('Branch format does not support tags.')
 
739
        # Now source has a tag pointing to an absent revision.  Sprout its
 
740
        # controldir.
 
741
        dir = source.bzrdir
 
742
        target = dir.sprout(self.get_url('target'), source_branch=source)
 
743
        # The tag is present in the target
 
744
        new_branch = target.open_branch()
 
745
        self.assertEqual('missing-rev', new_branch.tags.lookup_tag('tag-a'))
 
746
 
 
747
    def test_sprout_bzrdir_passing_rev_not_source_branch_copies_tags(self):
 
748
        # dir.sprout(..., revision_id='rev1') copies rev1, and all the tags of
 
749
        # the branch at that bzrdir, the ancestry of all of those, but no other
 
750
        # revs (not even the tip of the source branch).
 
751
        builder = self.make_branch_builder('source')
 
752
        builder.build_commit(message="Base", rev_id='base-rev')
 
753
        # Make three parallel lines of ancestry off this base.
 
754
        source = builder.get_branch()
 
755
        builder.build_commit(message="Rev A1", rev_id='rev-a1')
 
756
        builder.build_commit(message="Rev A2", rev_id='rev-a2')
 
757
        builder.build_commit(message="Rev A3", rev_id='rev-a3')
 
758
        source.set_last_revision_info(1, 'base-rev')
 
759
        builder.build_commit(message="Rev B1", rev_id='rev-b1')
 
760
        builder.build_commit(message="Rev B2", rev_id='rev-b2')
 
761
        builder.build_commit(message="Rev B3", rev_id='rev-b3')
 
762
        source.set_last_revision_info(1, 'base-rev')
 
763
        builder.build_commit(message="Rev C1", rev_id='rev-c1')
 
764
        builder.build_commit(message="Rev C2", rev_id='rev-c2')
 
765
        builder.build_commit(message="Rev C3", rev_id='rev-c3')
 
766
        # Set the branch tip to A2
 
767
        source.set_last_revision_info(3, 'rev-a2')
 
768
        try:
 
769
            # Create a tag for B2, and for an absent rev
 
770
            source.tags.set_tag('tag-non-ancestry', 'rev-b2')
 
771
            source.tags.set_tag('tag-absent', 'absent-rev')
 
772
        except errors.TagsNotSupported:
 
773
            raise TestNotApplicable('Branch format does not support tags.')
 
774
        source.get_config().set_user_option('branch.fetch_tags', 'True')
 
775
        # And ask sprout for C2
 
776
        dir = source.bzrdir
 
777
        target = dir.sprout(self.get_url('target'), revision_id='rev-c2')
 
778
        # The tags are present
 
779
        new_branch = target.open_branch()
 
780
        self.assertEqual(
 
781
            {'tag-absent': 'absent-rev', 'tag-non-ancestry': 'rev-b2'},
 
782
            new_branch.tags.get_tag_dict())
 
783
        # And the revs for A2, B2 and C2's ancestries are present, but no
 
784
        # others.
 
785
        self.assertEqual(
 
786
            ['base-rev', 'rev-b1', 'rev-b2', 'rev-c1', 'rev-c2'],
 
787
            sorted(new_branch.repository.all_revision_ids()))
 
788
 
 
789
    def test_sprout_bzrdir_tree_branch_reference(self):
 
790
        # sprouting should create a repository if needed and a sprouted branch.
 
791
        # the tree state should not be copied.
 
792
        referenced_branch = self.make_branch('referencced')
 
793
        dir = self.make_bzrdir('source')
 
794
        try:
 
795
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
796
                target_branch=referenced_branch)
 
797
        except errors.IncompatibleFormat:
 
798
            # this is ok too, not all formats have to support references.
 
799
            return
 
800
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
801
        tree = self.createWorkingTreeOrSkip(dir)
 
802
        self.build_tree(['source/subdir/'])
 
803
        tree.add('subdir')
 
804
        target = dir.sprout(self.get_url('target'))
 
805
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
806
        # we want target to have a branch that is in-place.
 
807
        self.assertEqual(target, target.open_branch().bzrdir)
 
808
        # and as we dont support repositories being detached yet, a repo in
 
809
        # place
 
810
        target.open_repository()
 
811
        result_tree = target.open_workingtree()
 
812
        self.assertFalse(result_tree.has_filename('subdir'))
 
813
 
 
814
    def test_sprout_bzrdir_tree_branch_reference_revision(self):
 
815
        # sprouting should create a repository if needed and a sprouted branch.
 
816
        # the tree state should not be copied but the revision changed,
 
817
        # and the likewise the new branch should be truncated too
 
818
        referenced_branch = self.make_branch('referencced')
 
819
        dir = self.make_bzrdir('source')
 
820
        try:
 
821
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
822
                target_branch=referenced_branch)
 
823
        except errors.IncompatibleFormat:
 
824
            # this is ok too, not all formats have to support references.
 
825
            return
 
826
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
827
        tree = self.createWorkingTreeOrSkip(dir)
 
828
        self.build_tree(['source/foo'])
 
829
        tree.add('foo')
 
830
        tree.commit('revision 1', rev_id='1')
 
831
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
832
        target = dir.sprout(self.get_url('target'), revision_id='1')
 
833
        self.skipIfNoWorkingTree(target)
 
834
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
835
        # we want target to have a branch that is in-place.
 
836
        self.assertEqual(target, target.open_branch().bzrdir)
 
837
        # and as we dont support repositories being detached yet, a repo in
 
838
        # place
 
839
        target.open_repository()
 
840
        # we trust that the working tree sprouting works via the other tests.
 
841
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
842
        self.assertEqual('1', target.open_branch().last_revision())
 
843
 
 
844
    def test_sprout_bzrdir_tree_revision(self):
 
845
        # test for revision limiting, [smoke test, not corner case checks].
 
846
        # make a tree with a revision with a last-revision
 
847
        # and sprout it with a revision limit.
 
848
        # This smoke test just checks the revision-id is right. Tree specific
 
849
        # tests will check corner cases.
 
850
        tree = self.make_branch_and_tree('source')
 
851
        self.build_tree(['source/foo'])
 
852
        tree.add('foo')
 
853
        tree.commit('revision 1', rev_id='1')
 
854
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
855
        dir = tree.bzrdir
 
856
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='1')
 
857
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
858
 
 
859
    def test_sprout_takes_accelerator(self):
 
860
        tree = self.make_branch_and_tree('source')
 
861
        self.build_tree(['source/foo'])
 
862
        tree.add('foo')
 
863
        tree.commit('revision 1', rev_id='1')
 
864
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
865
        dir = tree.bzrdir
 
866
        target = self.sproutOrSkip(dir, self.get_url('target'),
 
867
                                   accelerator_tree=tree)
 
868
        self.assertEqual(['2'], target.open_workingtree().get_parent_ids())
 
869
 
 
870
    def test_sprout_branch_no_tree(self):
 
871
        tree = self.make_branch_and_tree('source')
 
872
        self.build_tree(['source/foo'])
 
873
        tree.add('foo')
 
874
        tree.commit('revision 1', rev_id='1')
 
875
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
876
        dir = tree.bzrdir
 
877
        try:
 
878
            target = dir.sprout(self.get_url('target'),
 
879
                create_tree_if_local=False)
 
880
        except errors.MustHaveWorkingTree:
 
881
            raise TestNotApplicable("control dir format requires working tree")
 
882
        self.assertPathDoesNotExist('target/foo')
 
883
        self.assertEqual(tree.branch.last_revision(),
 
884
                         target.open_branch().last_revision())
 
885
 
 
886
    def test_sprout_with_revision_id_uses_default_stack_on(self):
 
887
        # Make a branch with three commits to stack on.
 
888
        builder = self.make_branch_builder('stack-on')
 
889
        builder.start_series()
 
890
        builder.build_commit(message='Rev 1.', rev_id='rev-1')
 
891
        builder.build_commit(message='Rev 2.', rev_id='rev-2')
 
892
        builder.build_commit(message='Rev 3.', rev_id='rev-3')
 
893
        builder.finish_series()
 
894
        stack_on = builder.get_branch()
 
895
        # Make a bzrdir with a default stacking policy to stack on that branch.
 
896
        config = self.make_bzrdir('policy-dir').get_config()
 
897
        try:
 
898
            config.set_default_stack_on(self.get_url('stack-on'))
 
899
        except errors.BzrError:
 
900
            raise TestNotApplicable('Only relevant for stackable formats.')
 
901
        # Sprout the stacked-on branch into the bzrdir.
 
902
        sprouted = stack_on.bzrdir.sprout(
 
903
            self.get_url('policy-dir/sprouted'), revision_id='rev-3')
 
904
        # Not all revisions are copied into the sprouted repository.
 
905
        repo = sprouted.open_repository()
 
906
        self.addCleanup(repo.lock_read().unlock)
 
907
        self.assertEqual(None, repo.get_parent_map(['rev-1']).get('rev-1'))
 
908
 
 
909
    def test_format_initialize_find_open(self):
 
910
        # loopback test to check the current format initializes to itself.
 
911
        if not self.bzrdir_format.is_supported():
 
912
            # unsupported formats are not loopback testable
 
913
            # because the default open will not open them and
 
914
            # they may not be initializable.
 
915
            return
 
916
        # for remote formats, there must be no prior assumption about the
 
917
        # network name to use - it's possible that this may somehow have got
 
918
        # in through an unisolated test though - see
 
919
        # <https://bugs.launchpad.net/bzr/+bug/504102>
 
920
        self.assertEquals(getattr(self.bzrdir_format,
 
921
            '_network_name', None),
 
922
            None)
 
923
        # supported formats must be able to init and open
 
924
        t = self.get_transport()
 
925
        readonly_t = self.get_readonly_transport()
 
926
        made_control = self.bzrdir_format.initialize(t.base)
 
927
        self.assertIsInstance(made_control, controldir.ControlDir)
 
928
        if isinstance(self.bzrdir_format, RemoteBzrDirFormat):
 
929
            return
 
930
        self.assertEqual(self.bzrdir_format,
 
931
                         controldir.ControlDirFormat.find_format(readonly_t))
 
932
        direct_opened_dir = self.bzrdir_format.open(readonly_t)
 
933
        opened_dir = bzrdir.BzrDir.open(t.base)
 
934
        self.assertEqual(made_control._format,
 
935
                         opened_dir._format)
 
936
        self.assertEqual(direct_opened_dir._format,
 
937
                         opened_dir._format)
 
938
        self.assertIsInstance(opened_dir, controldir.ControlDir)
 
939
 
 
940
    def test_format_initialize_on_transport_ex(self):
 
941
        t = self.get_transport('dir')
 
942
        self.assertInitializeEx(t)
 
943
 
 
944
    def test_format_initialize_on_transport_ex_use_existing_dir_True(self):
 
945
        t = self.get_transport('dir')
 
946
        t.ensure_base()
 
947
        self.assertInitializeEx(t, use_existing_dir=True)
 
948
 
 
949
    def test_format_initialize_on_transport_ex_use_existing_dir_False(self):
 
950
        if not self.bzrdir_format.is_supported():
 
951
            # Not initializable - not a failure either.
 
952
            return
 
953
        t = self.get_transport('dir')
 
954
        t.ensure_base()
 
955
        self.assertRaises(errors.FileExists,
 
956
            self.bzrdir_format.initialize_on_transport_ex, t,
 
957
            use_existing_dir=False)
 
958
 
 
959
    def test_format_initialize_on_transport_ex_create_prefix_True(self):
 
960
        t = self.get_transport('missing/dir')
 
961
        self.assertInitializeEx(t, create_prefix=True)
 
962
 
 
963
    def test_format_initialize_on_transport_ex_create_prefix_False(self):
 
964
        if not self.bzrdir_format.is_supported():
 
965
            # Not initializable - not a failure either.
 
966
            return
 
967
        t = self.get_transport('missing/dir')
 
968
        self.assertRaises(errors.NoSuchFile, self.assertInitializeEx, t,
 
969
            create_prefix=False)
 
970
 
 
971
    def test_format_initialize_on_transport_ex_force_new_repo_True(self):
 
972
        t = self.get_transport('repo')
 
973
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
974
        repo_name = repo_fmt.repository_format.network_name()
 
975
        repo = repo_fmt.initialize_on_transport_ex(t,
 
976
            repo_format_name=repo_name, shared_repo=True)[0]
 
977
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
 
978
            force_new_repo=True, repo_format_name=repo_name)
 
979
        if control is None:
 
980
            # uninitialisable format
 
981
            return
 
982
        self.assertNotEqual(repo.bzrdir.root_transport.base,
 
983
            made_repo.bzrdir.root_transport.base)
 
984
 
 
985
    def test_format_initialize_on_transport_ex_force_new_repo_False(self):
 
986
        t = self.get_transport('repo')
 
987
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
988
        repo_name = repo_fmt.repository_format.network_name()
 
989
        repo = repo_fmt.initialize_on_transport_ex(t,
 
990
            repo_format_name=repo_name, shared_repo=True)[0]
 
991
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
 
992
            force_new_repo=False, repo_format_name=repo_name)
 
993
        if control is None:
 
994
            # uninitialisable format
 
995
            return
 
996
        if not control._format.fixed_components:
 
997
            self.assertEqual(repo.bzrdir.root_transport.base,
 
998
                made_repo.bzrdir.root_transport.base)
 
999
 
 
1000
    def test_format_initialize_on_transport_ex_stacked_on(self):
 
1001
        # trunk is a stackable format.  Note that its in the same server area
 
1002
        # which is what launchpad does, but not sufficient to exercise the
 
1003
        # general case.
 
1004
        trunk = self.make_branch('trunk', format='1.9')
 
1005
        t = self.get_transport('stacked')
 
1006
        old_fmt = bzrdir.format_registry.make_bzrdir('pack-0.92')
 
1007
        repo_name = old_fmt.repository_format.network_name()
 
1008
        # Should end up with a 1.9 format (stackable)
 
1009
        repo, control = self.assertInitializeEx(t, need_meta=True,
 
1010
            repo_format_name=repo_name, stacked_on='../trunk',
 
1011
            stack_on_pwd=t.base)
 
1012
        if control is None:
 
1013
            # uninitialisable format
 
1014
            return
 
1015
        self.assertLength(1, repo._fallback_repositories)
 
1016
 
 
1017
    def test_format_initialize_on_transport_ex_default_stack_on(self):
 
1018
        # When initialize_on_transport_ex uses a stacked-on branch because of
 
1019
        # a stacking policy on the target, the location of the fallback
 
1020
        # repository is the same as the external location of the stacked-on
 
1021
        # branch.
 
1022
        balloon = self.make_bzrdir('balloon')
 
1023
        if isinstance(balloon._format, bzrdir.BzrDirMetaFormat1):
 
1024
            stack_on = self.make_branch('stack-on', format='1.9')
 
1025
        else:
 
1026
            stack_on = self.make_branch('stack-on')
 
1027
        config = self.make_bzrdir('.').get_config()
 
1028
        try:
 
1029
            config.set_default_stack_on('stack-on')
 
1030
        except errors.BzrError:
 
1031
            raise TestNotApplicable('Only relevant for stackable formats.')
 
1032
        # Initialize a bzrdir subject to the policy.
 
1033
        t = self.get_transport('stacked')
 
1034
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
1035
        repo_name = repo_fmt.repository_format.network_name()
 
1036
        repo, control = self.assertInitializeEx(
 
1037
            t, need_meta=True, repo_format_name=repo_name, stacked_on=None)
 
1038
        # self.addCleanup(repo.unlock)
 
1039
        if control is None:
 
1040
            # uninitialisable format
 
1041
            return
 
1042
        # There's one fallback repo, with a public location.
 
1043
        self.assertLength(1, repo._fallback_repositories)
 
1044
        fallback_repo = repo._fallback_repositories[0]
 
1045
        self.assertEqual(
 
1046
            stack_on.base, fallback_repo.bzrdir.root_transport.base)
 
1047
        # The bzrdir creates a branch in stacking-capable format.
 
1048
        new_branch = control.create_branch()
 
1049
        self.assertTrue(new_branch._format.supports_stacking())
 
1050
 
 
1051
    def test_format_initialize_on_transport_ex_repo_fmt_name_None(self):
 
1052
        t = self.get_transport('dir')
 
1053
        repo, control = self.assertInitializeEx(t)
 
1054
        self.assertEqual(None, repo)
 
1055
 
 
1056
    def test_format_initialize_on_transport_ex_repo_fmt_name_followed(self):
 
1057
        t = self.get_transport('dir')
 
1058
        # 1.6 is likely to never be default
 
1059
        fmt = bzrdir.format_registry.make_bzrdir('1.6')
 
1060
        repo_name = fmt.repository_format.network_name()
 
1061
        repo, control = self.assertInitializeEx(t, repo_format_name=repo_name)
 
1062
        if control is None:
 
1063
            # uninitialisable format
 
1064
            return
 
1065
        if self.bzrdir_format.fixed_components:
 
1066
            # must stay with the all-in-one-format.
 
1067
            repo_name = self.bzrdir_format.network_name()
 
1068
        self.assertEqual(repo_name, repo._format.network_name())
 
1069
 
 
1070
    def assertInitializeEx(self, t, need_meta=False, **kwargs):
 
1071
        """Execute initialize_on_transport_ex and check it succeeded correctly.
 
1072
 
 
1073
        This involves checking that the disk objects were created, open with
 
1074
        the same format returned, and had the expected disk format.
 
1075
 
 
1076
        :param t: The transport to initialize on.
 
1077
        :param **kwargs: Additional arguments to pass to
 
1078
            initialize_on_transport_ex.
 
1079
        :return: the resulting repo, control dir tuple.
 
1080
        """
 
1081
        if not self.bzrdir_format.is_supported():
 
1082
            # Not initializable - not a failure either.
 
1083
            return None, None
 
1084
        repo, control, require_stacking, repo_policy = \
 
1085
            self.bzrdir_format.initialize_on_transport_ex(t, **kwargs)
 
1086
        if repo is not None:
 
1087
            # Repositories are open write-locked
 
1088
            self.assertTrue(repo.is_write_locked())
 
1089
            self.addCleanup(repo.unlock)
 
1090
        self.assertIsInstance(control, controldir.ControlDir)
 
1091
        opened = bzrdir.BzrDir.open(t.base)
 
1092
        expected_format = self.bzrdir_format
 
1093
        if need_meta and expected_format.fixed_components:
 
1094
            # Pre-metadir formats change when we are making something that
 
1095
            # needs a metaformat, because clone is used for push.
 
1096
            expected_format = bzrdir.BzrDirMetaFormat1()
 
1097
        if not isinstance(expected_format, RemoteBzrDirFormat):
 
1098
            self.assertEqual(control._format.network_name(),
 
1099
                expected_format.network_name())
 
1100
            self.assertEqual(control._format.network_name(),
 
1101
                opened._format.network_name())
 
1102
        self.assertEqual(control.__class__, opened.__class__)
 
1103
        return repo, control
 
1104
 
 
1105
    def test_format_network_name(self):
 
1106
        # All control formats must have a network name.
 
1107
        dir = self.make_bzrdir('.')
 
1108
        format = dir._format
 
1109
        # We want to test that the network_name matches the actual format on
 
1110
        # disk. For local control dirsthat means that using network_name as a
 
1111
        # key in the registry gives back the same format. For remote obects
 
1112
        # we check that the network_name of the RemoteBzrDirFormat we have
 
1113
        # locally matches the actual format present on disk.
 
1114
        if isinstance(format, RemoteBzrDirFormat):
 
1115
            dir._ensure_real()
 
1116
            real_dir = dir._real_bzrdir
 
1117
            network_name = format.network_name()
 
1118
            self.assertEqual(real_dir._format.network_name(), network_name)
 
1119
        else:
 
1120
            registry = controldir.network_format_registry
 
1121
            network_name = format.network_name()
 
1122
            looked_up_format = registry.get(network_name)
 
1123
            self.assertTrue(
 
1124
                issubclass(format.__class__, looked_up_format.__class__))
 
1125
        # The network name must be a byte string.
 
1126
        self.assertIsInstance(network_name, str)
 
1127
 
 
1128
    def test_open_not_bzrdir(self):
 
1129
        # test the formats specific behaviour for no-content or similar dirs.
 
1130
        self.assertRaises(errors.NotBranchError,
 
1131
                          self.bzrdir_format.open,
 
1132
                          transport.get_transport(self.get_readonly_url()))
 
1133
 
 
1134
    def test_create_branch(self):
 
1135
        # a bzrdir can construct a branch and repository for itself.
 
1136
        if not self.bzrdir_format.is_supported():
 
1137
            # unsupported formats are not loopback testable
 
1138
            # because the default open will not open them and
 
1139
            # they may not be initializable.
 
1140
            return
 
1141
        t = self.get_transport()
 
1142
        made_control = self.bzrdir_format.initialize(t.base)
 
1143
        made_repo = made_control.create_repository()
 
1144
        made_branch = made_control.create_branch()
 
1145
        self.assertIsInstance(made_branch, bzrlib.branch.Branch)
 
1146
        self.assertEqual(made_control, made_branch.bzrdir)
 
1147
 
 
1148
    def test_open_branch(self):
 
1149
        if not self.bzrdir_format.is_supported():
 
1150
            # unsupported formats are not loopback testable
 
1151
            # because the default open will not open them and
 
1152
            # they may not be initializable.
 
1153
            return
 
1154
        t = self.get_transport()
 
1155
        made_control = self.bzrdir_format.initialize(t.base)
 
1156
        made_repo = made_control.create_repository()
 
1157
        made_branch = made_control.create_branch()
 
1158
        opened_branch = made_control.open_branch()
 
1159
        self.assertEqual(made_control, opened_branch.bzrdir)
 
1160
        self.assertIsInstance(opened_branch, made_branch.__class__)
 
1161
        self.assertIsInstance(opened_branch._format, made_branch._format.__class__)
 
1162
 
 
1163
    def test_list_branches(self):
 
1164
        if not self.bzrdir_format.is_supported():
 
1165
            # unsupported formats are not loopback testable
 
1166
            # because the default open will not open them and
 
1167
            # they may not be initializable.
 
1168
            return
 
1169
        t = self.get_transport()
 
1170
        made_control = self.bzrdir_format.initialize(t.base)
 
1171
        made_repo = made_control.create_repository()
 
1172
        made_branch = made_control.create_branch()
 
1173
        branches = made_control.list_branches()
 
1174
        self.assertEquals(1, len(branches))
 
1175
        self.assertEquals(made_branch.base, branches[0].base)
 
1176
        try:
 
1177
            made_control.destroy_branch()
 
1178
        except errors.UnsupportedOperation:
 
1179
            pass # Not all bzrdirs support destroying directories
 
1180
        else:
 
1181
            self.assertEquals([], made_control.list_branches())
 
1182
 
 
1183
    def test_create_repository(self):
 
1184
        # a bzrdir can construct a repository for itself.
 
1185
        if not self.bzrdir_format.is_supported():
 
1186
            # unsupported formats are not loopback testable
 
1187
            # because the default open will not open them and
 
1188
            # they may not be initializable.
 
1189
            return
 
1190
        t = self.get_transport()
 
1191
        made_control = self.bzrdir_format.initialize(t.base)
 
1192
        made_repo = made_control.create_repository()
 
1193
        # Check that we have a repository object.
 
1194
        made_repo.has_revision('foo')
 
1195
        self.assertEqual(made_control, made_repo.bzrdir)
 
1196
 
 
1197
    def test_create_repository_shared(self):
 
1198
        # a bzrdir can create a shared repository or
 
1199
        # fail appropriately
 
1200
        if not self.bzrdir_format.is_supported():
 
1201
            # unsupported formats are not loopback testable
 
1202
            # because the default open will not open them and
 
1203
            # they may not be initializable.
 
1204
            return
 
1205
        t = self.get_transport()
 
1206
        made_control = self.bzrdir_format.initialize(t.base)
 
1207
        try:
 
1208
            made_repo = made_control.create_repository(shared=True)
 
1209
        except errors.IncompatibleFormat:
 
1210
            # Old bzrdir formats don't support shared repositories
 
1211
            # and should raise IncompatibleFormat
 
1212
            return
 
1213
        self.assertTrue(made_repo.is_shared())
 
1214
 
 
1215
    def test_create_repository_nonshared(self):
 
1216
        # a bzrdir can create a non-shared repository
 
1217
        if not self.bzrdir_format.is_supported():
 
1218
            # unsupported formats are not loopback testable
 
1219
            # because the default open will not open them and
 
1220
            # they may not be initializable.
 
1221
            return
 
1222
        t = self.get_transport()
 
1223
        made_control = self.bzrdir_format.initialize(t.base)
 
1224
        made_repo = made_control.create_repository(shared=False)
 
1225
        self.assertFalse(made_repo.is_shared())
 
1226
 
 
1227
    def test_open_repository(self):
 
1228
        if not self.bzrdir_format.is_supported():
 
1229
            # unsupported formats are not loopback testable
 
1230
            # because the default open will not open them and
 
1231
            # they may not be initializable.
 
1232
            return
 
1233
        t = self.get_transport()
 
1234
        made_control = self.bzrdir_format.initialize(t.base)
 
1235
        made_repo = made_control.create_repository()
 
1236
        opened_repo = made_control.open_repository()
 
1237
        self.assertEqual(made_control, opened_repo.bzrdir)
 
1238
        self.assertIsInstance(opened_repo, made_repo.__class__)
 
1239
        self.assertIsInstance(opened_repo._format, made_repo._format.__class__)
 
1240
 
 
1241
    def test_create_workingtree(self):
 
1242
        # a bzrdir can construct a working tree for itself.
 
1243
        if not self.bzrdir_format.is_supported():
 
1244
            # unsupported formats are not loopback testable
 
1245
            # because the default open will not open them and
 
1246
            # they may not be initializable.
 
1247
            return
 
1248
        t = self.get_transport()
 
1249
        made_control = self.bzrdir_format.initialize(t.base)
 
1250
        made_repo = made_control.create_repository()
 
1251
        made_branch = made_control.create_branch()
 
1252
        made_tree = self.createWorkingTreeOrSkip(made_control)
 
1253
        self.assertIsInstance(made_tree, workingtree.WorkingTree)
 
1254
        self.assertEqual(made_control, made_tree.bzrdir)
 
1255
 
 
1256
    def test_create_workingtree_revision(self):
 
1257
        # a bzrdir can construct a working tree for itself @ a specific revision.
 
1258
        t = self.get_transport()
 
1259
        source = self.make_branch_and_tree('source')
 
1260
        source.commit('a', rev_id='a', allow_pointless=True)
 
1261
        source.commit('b', rev_id='b', allow_pointless=True)
 
1262
        t.mkdir('new')
 
1263
        t_new = t.clone('new')
 
1264
        made_control = self.bzrdir_format.initialize_on_transport(t_new)
 
1265
        source.branch.repository.clone(made_control)
 
1266
        source.branch.clone(made_control)
 
1267
        try:
 
1268
            made_tree = made_control.create_workingtree(revision_id='a')
 
1269
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
1270
            raise TestSkipped("Can't make working tree on transport %r" % t)
 
1271
        self.assertEqual(['a'], made_tree.get_parent_ids())
 
1272
 
 
1273
    def test_open_workingtree(self):
 
1274
        if not self.bzrdir_format.is_supported():
 
1275
            # unsupported formats are not loopback testable
 
1276
            # because the default open will not open them and
 
1277
            # they may not be initializable.
 
1278
            return
 
1279
        # this has to be tested with local access as we still support creating
 
1280
        # format 6 bzrdirs
 
1281
        t = self.get_transport()
 
1282
        try:
 
1283
            made_control = self.bzrdir_format.initialize(t.base)
 
1284
            made_repo = made_control.create_repository()
 
1285
            made_branch = made_control.create_branch()
 
1286
            made_tree = made_control.create_workingtree()
 
1287
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
1288
            raise TestSkipped("Can't initialize %r on transport %r"
 
1289
                              % (self.bzrdir_format, t))
 
1290
        opened_tree = made_control.open_workingtree()
 
1291
        self.assertEqual(made_control, opened_tree.bzrdir)
 
1292
        self.assertIsInstance(opened_tree, made_tree.__class__)
 
1293
        self.assertIsInstance(opened_tree._format, made_tree._format.__class__)
 
1294
 
 
1295
    def test_get_selected_branch(self):
 
1296
        # The segment parameters are accessible from the root transport
 
1297
        # if a URL with segment parameters is opened.
 
1298
        if not self.bzrdir_format.is_supported():
 
1299
            # unsupported formats are not loopback testable
 
1300
            # because the default open will not open them and
 
1301
            # they may not be initializable.
 
1302
            return
 
1303
        t = self.get_transport()
 
1304
        try:
 
1305
            made_control = self.bzrdir_format.initialize(t.base)
 
1306
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
1307
            raise TestSkipped("Can't initialize %r on transport %r"
 
1308
                              % (self.bzrdir_format, t))
 
1309
        dir = bzrdir.BzrDir.open(t.base+",branch=foo")
 
1310
        self.assertEquals({"branch": "foo"},
 
1311
            dir.user_transport.get_segment_parameters())
 
1312
        self.assertEquals("foo", dir._get_selected_branch())
 
1313
 
 
1314
    def test_get_selected_branch_none_selected(self):
 
1315
        # _get_selected_branch defaults to None
 
1316
        if not self.bzrdir_format.is_supported():
 
1317
            # unsupported formats are not loopback testable
 
1318
            # because the default open will not open them and
 
1319
            # they may not be initializable.
 
1320
            return
 
1321
        t = self.get_transport()
 
1322
        try:
 
1323
            made_control = self.bzrdir_format.initialize(t.base)
 
1324
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
1325
            raise TestSkipped("Can't initialize %r on transport %r"
 
1326
                              % (self.bzrdir_format, t))
 
1327
        dir = bzrdir.BzrDir.open(t.base)
 
1328
        self.assertIs(None, dir._get_selected_branch())
 
1329
 
 
1330
    def test_root_transport(self):
 
1331
        dir = self.make_bzrdir('.')
 
1332
        self.assertEqual(dir.root_transport.base,
 
1333
                         self.get_transport().base)
 
1334
 
 
1335
    def test_find_repository_no_repo_under_standalone_branch(self):
 
1336
        # finding a repo stops at standalone branches even if there is a
 
1337
        # higher repository available.
 
1338
        try:
 
1339
            repo = self.make_repository('.', shared=True)
 
1340
        except errors.IncompatibleFormat:
 
1341
            # need a shared repository to test this.
 
1342
            return
 
1343
        url = self.get_url('intermediate')
 
1344
        t = self.get_transport()
 
1345
        t.mkdir('intermediate')
 
1346
        t.mkdir('intermediate/child')
 
1347
        made_control = self.bzrdir_format.initialize(url)
 
1348
        made_control.create_repository()
 
1349
        innermost_control = self.bzrdir_format.initialize(
 
1350
            self.get_url('intermediate/child'))
 
1351
        try:
 
1352
            child_repo = innermost_control.open_repository()
 
1353
            # if there is a repository, then the format cannot ever hit this
 
1354
            # code path.
 
1355
            return
 
1356
        except errors.NoRepositoryPresent:
 
1357
            pass
 
1358
        self.assertRaises(errors.NoRepositoryPresent,
 
1359
                          innermost_control.find_repository)
 
1360
 
 
1361
    def test_find_repository_containing_shared_repository(self):
 
1362
        # find repo inside a shared repo with an empty control dir
 
1363
        # returns the shared repo.
 
1364
        try:
 
1365
            repo = self.make_repository('.', shared=True)
 
1366
        except errors.IncompatibleFormat:
 
1367
            # need a shared repository to test this.
 
1368
            return
 
1369
        url = self.get_url('childbzrdir')
 
1370
        self.get_transport().mkdir('childbzrdir')
 
1371
        made_control = self.bzrdir_format.initialize(url)
 
1372
        try:
 
1373
            child_repo = made_control.open_repository()
 
1374
            # if there is a repository, then the format cannot ever hit this
 
1375
            # code path.
 
1376
            return
 
1377
        except errors.NoRepositoryPresent:
 
1378
            pass
 
1379
        found_repo = made_control.find_repository()
 
1380
        self.assertEqual(repo.bzrdir.root_transport.base,
 
1381
                         found_repo.bzrdir.root_transport.base)
 
1382
 
 
1383
    def test_find_repository_standalone_with_containing_shared_repository(self):
 
1384
        # find repo inside a standalone repo inside a shared repo finds the standalone repo
 
1385
        try:
 
1386
            containing_repo = self.make_repository('.', shared=True)
 
1387
        except errors.IncompatibleFormat:
 
1388
            # need a shared repository to test this.
 
1389
            return
 
1390
        child_repo = self.make_repository('childrepo')
 
1391
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
 
1392
        found_repo = opened_control.find_repository()
 
1393
        self.assertEqual(child_repo.bzrdir.root_transport.base,
 
1394
                         found_repo.bzrdir.root_transport.base)
 
1395
 
 
1396
    def test_find_repository_shared_within_shared_repository(self):
 
1397
        # find repo at a shared repo inside a shared repo finds the inner repo
 
1398
        try:
 
1399
            containing_repo = self.make_repository('.', shared=True)
 
1400
        except errors.IncompatibleFormat:
 
1401
            # need a shared repository to test this.
 
1402
            return
 
1403
        url = self.get_url('childrepo')
 
1404
        self.get_transport().mkdir('childrepo')
 
1405
        child_control = self.bzrdir_format.initialize(url)
 
1406
        child_repo = child_control.create_repository(shared=True)
 
1407
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
 
1408
        found_repo = opened_control.find_repository()
 
1409
        self.assertEqual(child_repo.bzrdir.root_transport.base,
 
1410
                         found_repo.bzrdir.root_transport.base)
 
1411
        self.assertNotEqual(child_repo.bzrdir.root_transport.base,
 
1412
                            containing_repo.bzrdir.root_transport.base)
 
1413
 
 
1414
    def test_find_repository_with_nested_dirs_works(self):
 
1415
        # find repo inside a bzrdir inside a bzrdir inside a shared repo
 
1416
        # finds the outer shared repo.
 
1417
        try:
 
1418
            repo = self.make_repository('.', shared=True)
 
1419
        except errors.IncompatibleFormat:
 
1420
            # need a shared repository to test this.
 
1421
            return
 
1422
        url = self.get_url('intermediate')
 
1423
        t = self.get_transport()
 
1424
        t.mkdir('intermediate')
 
1425
        t.mkdir('intermediate/child')
 
1426
        made_control = self.bzrdir_format.initialize(url)
 
1427
        try:
 
1428
            child_repo = made_control.open_repository()
 
1429
            # if there is a repository, then the format cannot ever hit this
 
1430
            # code path.
 
1431
            return
 
1432
        except errors.NoRepositoryPresent:
 
1433
            pass
 
1434
        innermost_control = self.bzrdir_format.initialize(
 
1435
            self.get_url('intermediate/child'))
 
1436
        try:
 
1437
            child_repo = innermost_control.open_repository()
 
1438
            # if there is a repository, then the format cannot ever hit this
 
1439
            # code path.
 
1440
            return
 
1441
        except errors.NoRepositoryPresent:
 
1442
            pass
 
1443
        found_repo = innermost_control.find_repository()
 
1444
        self.assertEqual(repo.bzrdir.root_transport.base,
 
1445
                         found_repo.bzrdir.root_transport.base)
 
1446
 
 
1447
    def test_can_and_needs_format_conversion(self):
 
1448
        # check that we can ask an instance if its upgradable
 
1449
        dir = self.make_bzrdir('.')
 
1450
        if dir.can_convert_format():
 
1451
            # if its default updatable there must be an updater
 
1452
            # (we force the latest known format as downgrades may not be
 
1453
            # available
 
1454
            self.assertTrue(isinstance(dir._format.get_converter(
 
1455
                format=dir._format), controldir.Converter))
 
1456
        dir.needs_format_conversion(
 
1457
            controldir.ControlDirFormat.get_default_format())
 
1458
 
 
1459
    def test_backup_copies_existing(self):
 
1460
        tree = self.make_branch_and_tree('test')
 
1461
        self.build_tree(['test/a'])
 
1462
        tree.add(['a'], ['a-id'])
 
1463
        tree.commit('some data to be copied.')
 
1464
        old_url, new_url = tree.bzrdir.backup_bzrdir()
 
1465
        old_path = urlutils.local_path_from_url(old_url)
 
1466
        new_path = urlutils.local_path_from_url(new_url)
 
1467
        self.assertPathExists(old_path)
 
1468
        self.assertPathExists(new_path)
 
1469
        for (((dir_relpath1, _), entries1),
 
1470
             ((dir_relpath2, _), entries2)) in izip(
 
1471
                osutils.walkdirs(old_path),
 
1472
                osutils.walkdirs(new_path)):
 
1473
            self.assertEquals(dir_relpath1, dir_relpath2)
 
1474
            for f1, f2 in zip(entries1, entries2):
 
1475
                self.assertEquals(f1[0], f2[0])
 
1476
                self.assertEquals(f1[2], f2[2])
 
1477
                if f1[2] == "file":
 
1478
                    osutils.compare_files(open(f1[4]), open(f2[4]))
 
1479
 
 
1480
    def test_upgrade_new_instance(self):
 
1481
        """Does an available updater work?"""
 
1482
        dir = self.make_bzrdir('.')
 
1483
        # for now, upgrade is not ready for partial bzrdirs.
 
1484
        dir.create_repository()
 
1485
        dir.create_branch()
 
1486
        self.createWorkingTreeOrSkip(dir)
 
1487
        if dir.can_convert_format():
 
1488
            # if its default updatable there must be an updater
 
1489
            # (we force the latest known format as downgrades may not be
 
1490
            # available
 
1491
            pb = ui.ui_factory.nested_progress_bar()
 
1492
            try:
 
1493
                dir._format.get_converter(format=dir._format).convert(dir, pb)
 
1494
            finally:
 
1495
                pb.finished()
 
1496
            # and it should pass 'check' now.
 
1497
            check.check_dwim(self.get_url('.'), False, True, True)
 
1498
 
 
1499
    def test_format_description(self):
 
1500
        dir = self.make_bzrdir('.')
 
1501
        text = dir._format.get_format_description()
 
1502
        self.assertTrue(len(text))
 
1503
 
 
1504
 
 
1505
class TestBreakLock(TestCaseWithControlDir):
 
1506
 
 
1507
    def test_break_lock_empty(self):
 
1508
        # break lock on an empty bzrdir should work silently.
 
1509
        dir = self.make_bzrdir('.')
 
1510
        try:
 
1511
            dir.break_lock()
 
1512
        except NotImplementedError:
 
1513
            pass
 
1514
 
 
1515
    def test_break_lock_repository(self):
 
1516
        # break lock with just a repo should unlock the repo.
 
1517
        repo = self.make_repository('.')
 
1518
        repo.lock_write()
 
1519
        lock_repo = repo.bzrdir.open_repository()
 
1520
        if not lock_repo.get_physical_lock_status():
 
1521
            # This bzrdir's default repository does not physically lock things
 
1522
            # and thus this interaction cannot be tested at the interface
 
1523
            # level.
 
1524
            repo.unlock()
 
1525
            return
 
1526
        # only one yes needed here: it should only be unlocking
 
1527
        # the repo
 
1528
        bzrlib.ui.ui_factory = CannedInputUIFactory([True])
 
1529
        try:
 
1530
            repo.bzrdir.break_lock()
 
1531
        except NotImplementedError:
 
1532
            # this bzrdir does not implement break_lock - so we cant test it.
 
1533
            repo.unlock()
 
1534
            return
 
1535
        lock_repo.lock_write()
 
1536
        lock_repo.unlock()
 
1537
        self.assertRaises(errors.LockBroken, repo.unlock)
 
1538
 
 
1539
    def test_break_lock_branch(self):
 
1540
        # break lock with just a repo should unlock the branch.
 
1541
        # and not directly try the repository.
 
1542
        # we test this by making a branch reference to a branch
 
1543
        # and repository in another bzrdir
 
1544
        # for pre-metadir formats this will fail, thats ok.
 
1545
        master = self.make_branch('branch')
 
1546
        thisdir = self.make_bzrdir('this')
 
1547
        try:
 
1548
            bzrlib.branch.BranchReferenceFormat().initialize(
 
1549
                thisdir, target_branch=master)
 
1550
        except errors.IncompatibleFormat:
 
1551
            return
 
1552
        unused_repo = thisdir.create_repository()
 
1553
        master.lock_write()
 
1554
        unused_repo.lock_write()
 
1555
        try:
 
1556
            # two yes's : branch and repository. If the repo in this
 
1557
            # dir is inappropriately accessed, 3 will be needed, and
 
1558
            # we'll see that because the stream will be fully consumed
 
1559
            bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
 
1560
            # determine if the repository will have been locked;
 
1561
            this_repo_locked = \
 
1562
                thisdir.open_repository().get_physical_lock_status()
 
1563
            master.bzrdir.break_lock()
 
1564
            if this_repo_locked:
 
1565
                # only two ys should have been read
 
1566
                self.assertEqual([True],
 
1567
                    bzrlib.ui.ui_factory.responses)
 
1568
            else:
 
1569
                # only one y should have been read
 
1570
                self.assertEqual([True, True],
 
1571
                    bzrlib.ui.ui_factory.responses)
 
1572
            # we should be able to lock a newly opened branch now
 
1573
            branch = master.bzrdir.open_branch()
 
1574
            branch.lock_write()
 
1575
            branch.unlock()
 
1576
            if this_repo_locked:
 
1577
                # we should not be able to lock the repository in thisdir as
 
1578
                # its still held by the explicit lock we took, and the break
 
1579
                # lock should not have touched it.
 
1580
                repo = thisdir.open_repository()
 
1581
                self.assertRaises(errors.LockContention, repo.lock_write)
 
1582
        finally:
 
1583
            unused_repo.unlock()
 
1584
        self.assertRaises(errors.LockBroken, master.unlock)
 
1585
 
 
1586
    def test_break_lock_tree(self):
 
1587
        # break lock with a tree should unlock the tree but not try the
 
1588
        # branch explicitly. However this is very hard to test for as we
 
1589
        # dont have a tree reference class, nor is one needed;
 
1590
        # the worst case if this code unlocks twice is an extra question
 
1591
        # being asked.
 
1592
        tree = self.make_branch_and_tree('.')
 
1593
        tree.lock_write()
 
1594
        # three yes's : tree, branch and repository.
 
1595
        bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
 
1596
        try:
 
1597
            tree.bzrdir.break_lock()
 
1598
        except (NotImplementedError, errors.LockActive):
 
1599
            # bzrdir does not support break_lock
 
1600
            # or one of the locked objects (currently only tree does this)
 
1601
            # raised a LockActive because we do still have a live locked
 
1602
            # object.
 
1603
            tree.unlock()
 
1604
            return
 
1605
        self.assertEqual([True],
 
1606
                bzrlib.ui.ui_factory.responses)
 
1607
        lock_tree = tree.bzrdir.open_workingtree()
 
1608
        lock_tree.lock_write()
 
1609
        lock_tree.unlock()
 
1610
        self.assertRaises(errors.LockBroken, tree.unlock)
 
1611
 
 
1612
 
 
1613
class TestTransportConfig(TestCaseWithControlDir):
 
1614
 
 
1615
    def test_get_config(self):
 
1616
        my_dir = self.make_bzrdir('.')
 
1617
        config = my_dir.get_config()
 
1618
        try:
 
1619
            config.set_default_stack_on('http://example.com')
 
1620
        except errors.BzrError, e:
 
1621
            if 'Cannot set config' in str(e):
 
1622
                self.assertFalse(
 
1623
                    isinstance(my_dir, (bzrdir.BzrDirMeta1, RemoteBzrDir)),
 
1624
                    "%r should support configs" % my_dir)
 
1625
                raise TestNotApplicable(
 
1626
                    'This BzrDir format does not support configs.')
 
1627
            else:
 
1628
                raise
 
1629
        self.assertEqual('http://example.com', config.get_default_stack_on())
 
1630
        my_dir2 = bzrdir.BzrDir.open(self.get_url('.'))
 
1631
        config2 = my_dir2.get_config()
 
1632
        self.assertEqual('http://example.com', config2.get_default_stack_on())
 
1633
 
 
1634
 
 
1635
class ChrootedControlDirTests(ChrootedTestCase):
 
1636
 
 
1637
    def test_find_repository_no_repository(self):
 
1638
        # loopback test to check the current format fails to find a
 
1639
        # share repository correctly.
 
1640
        if not self.bzrdir_format.is_supported():
 
1641
            # unsupported formats are not loopback testable
 
1642
            # because the default open will not open them and
 
1643
            # they may not be initializable.
 
1644
            return
 
1645
        # supported formats must be able to init and open
 
1646
        # - do the vfs initialisation over the basic vfs transport
 
1647
        # XXX: TODO this should become a 'bzrdirlocation' api call.
 
1648
        url = self.get_vfs_only_url('subdir')
 
1649
        transport.get_transport_from_url(self.get_vfs_only_url()).mkdir(
 
1650
            'subdir')
 
1651
        made_control = self.bzrdir_format.initialize(self.get_url('subdir'))
 
1652
        try:
 
1653
            repo = made_control.open_repository()
 
1654
            # if there is a repository, then the format cannot ever hit this
 
1655
            # code path.
 
1656
            return
 
1657
        except errors.NoRepositoryPresent:
 
1658
            pass
 
1659
        made_control = bzrdir.BzrDir.open(self.get_readonly_url('subdir'))
 
1660
        self.assertRaises(errors.NoRepositoryPresent,
 
1661
                          made_control.find_repository)
 
1662
 
 
1663
 
 
1664
class TestControlDirControlComponent(TestCaseWithControlDir):
 
1665
    """ControlDir implementations adequately implement ControlComponent."""
 
1666
 
 
1667
    def test_urls(self):
 
1668
        bd = self.make_bzrdir('bd')
 
1669
        self.assertIsInstance(bd.user_url, str)
 
1670
        self.assertEqual(bd.user_url, bd.user_transport.base)
 
1671
        # for all current bzrdir implementations the user dir must be 
 
1672
        # above the control dir but we might need to relax that?
 
1673
        self.assertEqual(bd.control_url.find(bd.user_url), 0)
 
1674
        self.assertEqual(bd.control_url, bd.control_transport.base)
 
1675
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
1676
# GNU General Public License for more details.
 
1677
#
 
1678
# You should have received a copy of the GNU General Public License
 
1679
# along with this program; if not, write to the Free Software
 
1680
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
1681
 
 
1682
"""Tests for control directory implementations - tests a controldir format."""
 
1683
 
 
1684
from itertools import izip
 
1685
 
 
1686
import bzrlib.branch
 
1687
from bzrlib import (
 
1688
    bzrdir,
 
1689
    check,
 
1690
    controldir,
 
1691
    errors,
 
1692
    gpg,
 
1693
    osutils,
 
1694
    transport,
 
1695
    ui,
 
1696
    urlutils,
 
1697
    workingtree,
 
1698
    )
 
1699
import bzrlib.revision
 
1700
from bzrlib.tests import (
 
1701
    fixtures,
 
1702
    ChrootedTestCase,
 
1703
    TestNotApplicable,
 
1704
    TestSkipped,
 
1705
    )
 
1706
from bzrlib.tests.per_controldir import TestCaseWithControlDir
 
1707
from bzrlib.transport.local import LocalTransport
 
1708
from bzrlib.ui import (
 
1709
    CannedInputUIFactory,
 
1710
    )
 
1711
from bzrlib.remote import (
 
1712
    RemoteBzrDir,
 
1713
    RemoteBzrDirFormat,
 
1714
    RemoteRepository,
 
1715
    )
 
1716
 
 
1717
 
 
1718
class TestControlDir(TestCaseWithControlDir):
 
1719
 
 
1720
    def skipIfNoWorkingTree(self, a_bzrdir):
 
1721
        """Raises TestSkipped if a_bzrdir doesn't have a working tree.
 
1722
 
 
1723
        If the bzrdir does have a workingtree, this is a no-op.
 
1724
        """
 
1725
        try:
 
1726
            a_bzrdir.open_workingtree()
 
1727
        except (errors.NotLocalUrl, errors.NoWorkingTree):
 
1728
            raise TestSkipped("bzrdir on transport %r has no working tree"
 
1729
                              % a_bzrdir.transport)
 
1730
 
 
1731
    def openWorkingTreeIfLocal(self, a_bzrdir):
 
1732
        """If a_bzrdir is on a local transport, call open_workingtree() on it.
 
1733
        """
 
1734
        if not isinstance(a_bzrdir.root_transport, LocalTransport):
 
1735
            # it's not local, but that's ok
 
1736
            return
 
1737
        a_bzrdir.open_workingtree()
 
1738
 
 
1739
    def createWorkingTreeOrSkip(self, a_bzrdir):
 
1740
        """Create a working tree on a_bzrdir, or raise TestSkipped.
 
1741
 
 
1742
        A simple wrapper for create_workingtree that translates NotLocalUrl into
 
1743
        TestSkipped.  Returns the newly created working tree.
 
1744
        """
 
1745
        try:
 
1746
            return a_bzrdir.create_workingtree()
 
1747
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
1748
            raise TestSkipped("cannot make working tree with transport %r"
 
1749
                              % a_bzrdir.transport)
 
1750
 
 
1751
    def sproutOrSkip(self, from_bzrdir, to_url, revision_id=None,
 
1752
                     force_new_repo=False, accelerator_tree=None,
 
1753
                     create_tree_if_local=True):
 
1754
        """Sprout from_bzrdir into to_url, or raise TestSkipped.
 
1755
 
 
1756
        A simple wrapper for from_bzrdir.sprout that translates NotLocalUrl into
 
1757
        TestSkipped.  Returns the newly sprouted bzrdir.
 
1758
        """
 
1759
        to_transport = transport.get_transport(to_url)
 
1760
        if not isinstance(to_transport, LocalTransport):
 
1761
            raise TestSkipped('Cannot sprout to remote bzrdirs.')
 
1762
        target = from_bzrdir.sprout(to_url, revision_id=revision_id,
 
1763
                                    force_new_repo=force_new_repo,
 
1764
                                    possible_transports=[to_transport],
 
1765
                                    accelerator_tree=accelerator_tree,
 
1766
                                    create_tree_if_local=create_tree_if_local)
 
1767
        return target
 
1768
 
 
1769
    def test_create_null_workingtree(self):
 
1770
        dir = self.make_bzrdir('dir1')
 
1771
        dir.create_repository()
 
1772
        dir.create_branch()
 
1773
        try:
 
1774
            wt = dir.create_workingtree(revision_id=bzrlib.revision.NULL_REVISION)
 
1775
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
1776
            raise TestSkipped("cannot make working tree with transport %r"
 
1777
                              % dir.transport)
 
1778
        self.assertEqual([], wt.get_parent_ids())
 
1779
 
 
1780
    def test_destroy_workingtree(self):
 
1781
        tree = self.make_branch_and_tree('tree')
 
1782
        self.build_tree(['tree/file'])
 
1783
        tree.add('file')
 
1784
        tree.commit('first commit')
 
1785
        bzrdir = tree.bzrdir
 
1786
        try:
 
1787
            bzrdir.destroy_workingtree()
 
1788
        except errors.UnsupportedOperation:
 
1789
            raise TestSkipped('Format does not support destroying tree')
 
1790
        self.assertPathDoesNotExist('tree/file')
 
1791
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
 
1792
        bzrdir.create_workingtree()
 
1793
        self.assertPathExists('tree/file')
 
1794
        bzrdir.destroy_workingtree_metadata()
 
1795
        self.assertPathExists('tree/file')
 
1796
        self.assertRaises(errors.NoWorkingTree, bzrdir.open_workingtree)
 
1797
 
 
1798
    def test_destroy_branch(self):
 
1799
        branch = self.make_branch('branch')
 
1800
        bzrdir = branch.bzrdir
 
1801
        try:
 
1802
            bzrdir.destroy_branch()
 
1803
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
 
1804
            raise TestNotApplicable('Format does not support destroying branch')
 
1805
        self.assertRaises(errors.NotBranchError, bzrdir.open_branch)
 
1806
        bzrdir.create_branch()
 
1807
        bzrdir.open_branch()
 
1808
 
 
1809
    def test_destroy_repository(self):
 
1810
        repo = self.make_repository('repository')
 
1811
        bzrdir = repo.bzrdir
 
1812
        try:
 
1813
            bzrdir.destroy_repository()
 
1814
        except (errors.UnsupportedOperation, errors.TransportNotPossible):
 
1815
            raise TestNotApplicable('Format does not support destroying'
 
1816
                                    ' repository')
 
1817
        self.assertRaises(errors.NoRepositoryPresent, bzrdir.open_repository)
 
1818
        bzrdir.create_repository()
 
1819
        bzrdir.open_repository()
 
1820
 
 
1821
    def test_open_workingtree_raises_no_working_tree(self):
 
1822
        """ControlDir.open_workingtree() should raise NoWorkingTree (rather than
 
1823
        e.g. NotLocalUrl) if there is no working tree.
 
1824
        """
 
1825
        dir = self.make_bzrdir('source')
 
1826
        vfs_dir = bzrdir.BzrDir.open(self.get_vfs_only_url('source'))
 
1827
        if vfs_dir.has_workingtree():
 
1828
            # This ControlDir format doesn't support ControlDirs without
 
1829
            # working trees, so this test is irrelevant.
 
1830
            return
 
1831
        self.assertRaises(errors.NoWorkingTree, dir.open_workingtree)
 
1832
 
 
1833
    def test_clone_bzrdir_repository_under_shared(self):
 
1834
        tree = self.make_branch_and_tree('commit_tree')
 
1835
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
 
1836
        tree.add('foo')
 
1837
        tree.commit('revision 1', rev_id='1')
 
1838
        dir = self.make_bzrdir('source')
 
1839
        repo = dir.create_repository()
 
1840
        repo.fetch(tree.branch.repository)
 
1841
        self.assertTrue(repo.has_revision('1'))
 
1842
        try:
 
1843
            self.make_repository('target', shared=True)
 
1844
        except errors.IncompatibleFormat:
 
1845
            return
 
1846
        target = dir.clone(self.get_url('target/child'))
 
1847
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1848
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
1849
 
 
1850
    def test_clone_bzrdir_repository_branch_both_under_shared(self):
 
1851
        # Create a shared repository
 
1852
        try:
 
1853
            shared_repo = self.make_repository('shared', shared=True)
 
1854
        except errors.IncompatibleFormat:
 
1855
            return
 
1856
        # Make a branch, 'commit_tree', and working tree outside of the shared
 
1857
        # repository, and commit some revisions to it.
 
1858
        tree = self.make_branch_and_tree('commit_tree')
 
1859
        self.build_tree(['foo'], transport=tree.bzrdir.root_transport)
 
1860
        tree.add('foo')
 
1861
        tree.commit('revision 1', rev_id='1')
 
1862
        tree.bzrdir.open_branch().generate_revision_history(
 
1863
            bzrlib.revision.NULL_REVISION)
 
1864
        tree.set_parent_trees([])
 
1865
        tree.commit('revision 2', rev_id='2')
 
1866
        # Copy the content (i.e. revisions) from the 'commit_tree' branch's
 
1867
        # repository into the shared repository.
 
1868
        tree.branch.repository.copy_content_into(shared_repo)
 
1869
        # Make a branch 'source' inside the shared repository.
 
1870
        dir = self.make_bzrdir('shared/source')
 
1871
        dir.create_branch()
 
1872
        # Clone 'source' to 'target', also inside the shared repository.
 
1873
        target = dir.clone(self.get_url('shared/target'))
 
1874
        # 'source', 'target', and the shared repo all have distinct bzrdirs.
 
1875
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1876
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
1877
        # The shared repository will contain revisions from the 'commit_tree'
 
1878
        # repository, even revisions that are not part of the history of the
 
1879
        # 'commit_tree' branch.
 
1880
        self.assertTrue(shared_repo.has_revision('1'))
 
1881
 
 
1882
    def test_clone_bzrdir_repository_branch_only_source_under_shared(self):
 
1883
        try:
 
1884
            shared_repo = self.make_repository('shared', shared=True)
 
1885
        except errors.IncompatibleFormat:
 
1886
            return
 
1887
        tree = self.make_branch_and_tree('commit_tree')
 
1888
        self.build_tree(['commit_tree/foo'])
 
1889
        tree.add('foo')
 
1890
        tree.commit('revision 1', rev_id='1')
 
1891
        tree.branch.bzrdir.open_branch().generate_revision_history(
 
1892
            bzrlib.revision.NULL_REVISION)
 
1893
        tree.set_parent_trees([])
 
1894
        tree.commit('revision 2', rev_id='2')
 
1895
        tree.branch.repository.copy_content_into(shared_repo)
 
1896
        if shared_repo.make_working_trees():
 
1897
            shared_repo.set_make_working_trees(False)
 
1898
            self.assertFalse(shared_repo.make_working_trees())
 
1899
        self.assertTrue(shared_repo.has_revision('1'))
 
1900
        dir = self.make_bzrdir('shared/source')
 
1901
        dir.create_branch()
 
1902
        target = dir.clone(self.get_url('target'))
 
1903
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1904
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
1905
        branch = target.open_branch()
 
1906
        self.assertTrue(branch.repository.has_revision('1'))
 
1907
        self.assertFalse(branch.repository.make_working_trees())
 
1908
        self.assertTrue(branch.repository.is_shared())
 
1909
 
 
1910
    def test_clone_bzrdir_repository_revision(self):
 
1911
        # test for revision limiting, [smoke test, not corner case checks].
 
1912
        # make a repository with some revisions,
 
1913
        # and clone it with a revision limit.
 
1914
        #
 
1915
        tree = self.make_branch_and_tree('commit_tree')
 
1916
        self.build_tree(['commit_tree/foo'])
 
1917
        tree.add('foo')
 
1918
        tree.commit('revision 1', rev_id='1')
 
1919
        tree.branch.bzrdir.open_branch().generate_revision_history(
 
1920
            bzrlib.revision.NULL_REVISION)
 
1921
        tree.set_parent_trees([])
 
1922
        tree.commit('revision 2', rev_id='2')
 
1923
        source = self.make_repository('source')
 
1924
        tree.branch.repository.copy_content_into(source)
 
1925
        dir = source.bzrdir
 
1926
        target = dir.clone(self.get_url('target'), revision_id='2')
 
1927
        raise TestSkipped('revision limiting not strict yet')
 
1928
 
 
1929
    def test_clone_bzrdir_branch_and_repo_fixed_user_id(self):
 
1930
        # Bug #430868 is about an email containing '.sig'
 
1931
        self.overrideEnv('BZR_EMAIL', 'murphy@host.sighup.org')
 
1932
        tree = self.make_branch_and_tree('commit_tree')
 
1933
        self.build_tree(['commit_tree/foo'])
 
1934
        tree.add('foo')
 
1935
        rev1 = tree.commit('revision 1')
 
1936
        tree_repo = tree.branch.repository
 
1937
        tree_repo.lock_write()
 
1938
        tree_repo.start_write_group()
 
1939
        tree_repo.sign_revision(rev1, gpg.LoopbackGPGStrategy(None))
 
1940
        tree_repo.commit_write_group()
 
1941
        tree_repo.unlock()
 
1942
        target = self.make_branch('target')
 
1943
        tree.branch.repository.copy_content_into(target.repository)
 
1944
        tree.branch.copy_content_into(target)
 
1945
        self.assertTrue(target.repository.has_revision(rev1))
 
1946
        self.assertEqual(
 
1947
            tree_repo.get_signature_text(rev1),
 
1948
            target.repository.get_signature_text(rev1))
 
1949
 
 
1950
    def test_clone_bzrdir_branch_and_repo_into_shared_repo(self):
 
1951
        # by default cloning into a shared repo uses the shared repo.
 
1952
        tree = self.make_branch_and_tree('commit_tree')
 
1953
        self.build_tree(['commit_tree/foo'])
 
1954
        tree.add('foo')
 
1955
        tree.commit('revision 1')
 
1956
        source = self.make_branch('source')
 
1957
        tree.branch.repository.copy_content_into(source.repository)
 
1958
        tree.branch.copy_content_into(source)
 
1959
        try:
 
1960
            self.make_repository('target', shared=True)
 
1961
        except errors.IncompatibleFormat:
 
1962
            return
 
1963
        dir = source.bzrdir
 
1964
        target = dir.clone(self.get_url('target/child'))
 
1965
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
1966
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
1967
        self.assertEqual(source.revision_history(),
 
1968
                         target.open_branch().revision_history())
 
1969
 
 
1970
    def test_clone_bzrdir_branch_revision(self):
 
1971
        # test for revision limiting, [smoke test, not corner case checks].
 
1972
        # make a branch with some revisions,
 
1973
        # and clone it with a revision limit.
 
1974
        #
 
1975
        tree = self.make_branch_and_tree('commit_tree')
 
1976
        self.build_tree(['commit_tree/foo'])
 
1977
        tree.add('foo')
 
1978
        tree.commit('revision 1', rev_id='1')
 
1979
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
1980
        source = self.make_branch('source')
 
1981
        tree.branch.repository.copy_content_into(source.repository)
 
1982
        tree.branch.copy_content_into(source)
 
1983
        dir = source.bzrdir
 
1984
        target = dir.clone(self.get_url('target'), revision_id='1')
 
1985
        self.assertEqual('1', target.open_branch().last_revision())
 
1986
 
 
1987
    def test_clone_on_transport_preserves_repo_format(self):
 
1988
        if self.bzrdir_format == bzrdir.format_registry.make_bzrdir('default'):
 
1989
            format = 'knit'
 
1990
        else:
 
1991
            format = None
 
1992
        source_branch = self.make_branch('source', format=format)
 
1993
        # Ensure no format data is cached
 
1994
        a_dir = bzrlib.branch.Branch.open_from_transport(
 
1995
            self.get_transport('source')).bzrdir
 
1996
        target_transport = self.get_transport('target')
 
1997
        target_bzrdir = a_dir.clone_on_transport(target_transport)
 
1998
        target_repo = target_bzrdir.open_repository()
 
1999
        source_branch = bzrlib.branch.Branch.open(
 
2000
            self.get_vfs_only_url('source'))
 
2001
        if isinstance(target_repo, RemoteRepository):
 
2002
            target_repo._ensure_real()
 
2003
            target_repo = target_repo._real_repository
 
2004
        self.assertEqual(target_repo._format, source_branch.repository._format)
 
2005
 
 
2006
    def test_clone_bzrdir_tree_revision(self):
 
2007
        # test for revision limiting, [smoke test, not corner case checks].
 
2008
        # make a tree with a revision with a last-revision
 
2009
        # and clone it with a revision limit.
 
2010
        # This smoke test just checks the revision-id is right. Tree specific
 
2011
        # tests will check corner cases.
 
2012
        tree = self.make_branch_and_tree('source')
 
2013
        self.build_tree(['source/foo'])
 
2014
        tree.add('foo')
 
2015
        tree.commit('revision 1', rev_id='1')
 
2016
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
2017
        dir = tree.bzrdir
 
2018
        target = dir.clone(self.get_url('target'), revision_id='1')
 
2019
        self.skipIfNoWorkingTree(target)
 
2020
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
2021
 
 
2022
    def test_clone_bzrdir_into_notrees_repo(self):
 
2023
        """Cloning into a no-trees repo should not create a working tree"""
 
2024
        tree = self.make_branch_and_tree('source')
 
2025
        self.build_tree(['source/foo'])
 
2026
        tree.add('foo')
 
2027
        tree.commit('revision 1')
 
2028
 
 
2029
        try:
 
2030
            repo = self.make_repository('repo', shared=True)
 
2031
        except errors.IncompatibleFormat:
 
2032
            raise TestNotApplicable('must support shared repositories')
 
2033
        if repo.make_working_trees():
 
2034
            repo.set_make_working_trees(False)
 
2035
            self.assertFalse(repo.make_working_trees())
 
2036
 
 
2037
        dir = tree.bzrdir
 
2038
        a_dir = dir.clone(self.get_url('repo/a'))
 
2039
        a_dir.open_branch()
 
2040
        self.assertRaises(errors.NoWorkingTree, a_dir.open_workingtree)
 
2041
 
 
2042
    def test_clone_respects_stacked(self):
 
2043
        branch = self.make_branch('parent')
 
2044
        child_transport = self.get_transport('child')
 
2045
        child = branch.bzrdir.clone_on_transport(child_transport,
 
2046
                                                 stacked_on=branch.base)
 
2047
        self.assertEqual(child.open_branch().get_stacked_on_url(), branch.base)
 
2048
 
 
2049
    def test_get_branch_reference_on_reference(self):
 
2050
        """get_branch_reference should return the right url."""
 
2051
        referenced_branch = self.make_branch('referenced')
 
2052
        dir = self.make_bzrdir('source')
 
2053
        try:
 
2054
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
2055
                target_branch=referenced_branch)
 
2056
        except errors.IncompatibleFormat:
 
2057
            # this is ok too, not all formats have to support references.
 
2058
            return
 
2059
        self.assertEqual(referenced_branch.bzrdir.root_transport.abspath('') + '/',
 
2060
            dir.get_branch_reference())
 
2061
 
 
2062
    def test_get_branch_reference_on_non_reference(self):
 
2063
        """get_branch_reference should return None for non-reference branches."""
 
2064
        branch = self.make_branch('referenced')
 
2065
        self.assertEqual(None, branch.bzrdir.get_branch_reference())
 
2066
 
 
2067
    def test_get_branch_reference_no_branch(self):
 
2068
        """get_branch_reference should not mask NotBranchErrors."""
 
2069
        dir = self.make_bzrdir('source')
 
2070
        if dir.has_branch():
 
2071
            # this format does not support branchless bzrdirs.
 
2072
            return
 
2073
        self.assertRaises(errors.NotBranchError, dir.get_branch_reference)
 
2074
 
 
2075
    def test_sprout_bzrdir_empty(self):
 
2076
        dir = self.make_bzrdir('source')
 
2077
        target = dir.sprout(self.get_url('target'))
 
2078
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2079
        # creates a new repository branch and tree
 
2080
        target.open_repository()
 
2081
        target.open_branch()
 
2082
        self.openWorkingTreeIfLocal(target)
 
2083
 
 
2084
    def test_sprout_bzrdir_empty_under_shared_repo(self):
 
2085
        # sprouting an empty dir into a repo uses the repo
 
2086
        dir = self.make_bzrdir('source')
 
2087
        try:
 
2088
            self.make_repository('target', shared=True)
 
2089
        except errors.IncompatibleFormat:
 
2090
            return
 
2091
        target = dir.sprout(self.get_url('target/child'))
 
2092
        self.assertRaises(errors.NoRepositoryPresent, target.open_repository)
 
2093
        target.open_branch()
 
2094
        try:
 
2095
            target.open_workingtree()
 
2096
        except errors.NoWorkingTree:
 
2097
            # Some bzrdirs can never have working trees.
 
2098
            self.assertFalse(target._format.supports_workingtrees)
 
2099
 
 
2100
    def test_sprout_bzrdir_empty_under_shared_repo_force_new(self):
 
2101
        # the force_new_repo parameter should force use of a new repo in an empty
 
2102
        # bzrdir's sprout logic
 
2103
        dir = self.make_bzrdir('source')
 
2104
        try:
 
2105
            self.make_repository('target', shared=True)
 
2106
        except errors.IncompatibleFormat:
 
2107
            return
 
2108
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
2109
        target.open_repository()
 
2110
        target.open_branch()
 
2111
        self.openWorkingTreeIfLocal(target)
 
2112
 
 
2113
    def test_sprout_bzrdir_with_repository_to_shared(self):
 
2114
        tree = self.make_branch_and_tree('commit_tree')
 
2115
        self.build_tree(['commit_tree/foo'])
 
2116
        tree.add('foo')
 
2117
        tree.commit('revision 1', rev_id='1')
 
2118
        tree.bzrdir.open_branch().generate_revision_history(
 
2119
            bzrlib.revision.NULL_REVISION)
 
2120
        tree.set_parent_trees([])
 
2121
        tree.commit('revision 2', rev_id='2')
 
2122
        source = self.make_repository('source')
 
2123
        tree.branch.repository.copy_content_into(source)
 
2124
        dir = source.bzrdir
 
2125
        try:
 
2126
            shared_repo = self.make_repository('target', shared=True)
 
2127
        except errors.IncompatibleFormat:
 
2128
            return
 
2129
        target = dir.sprout(self.get_url('target/child'))
 
2130
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2131
        self.assertTrue(shared_repo.has_revision('1'))
 
2132
 
 
2133
    def test_sprout_bzrdir_repository_branch_both_under_shared(self):
 
2134
        try:
 
2135
            shared_repo = self.make_repository('shared', shared=True)
 
2136
        except errors.IncompatibleFormat:
 
2137
            return
 
2138
        tree = self.make_branch_and_tree('commit_tree')
 
2139
        self.build_tree(['commit_tree/foo'])
 
2140
        tree.add('foo')
 
2141
        tree.commit('revision 1', rev_id='1')
 
2142
        tree.bzrdir.open_branch().generate_revision_history(
 
2143
            bzrlib.revision.NULL_REVISION)
 
2144
        tree.set_parent_trees([])
 
2145
        tree.commit('revision 2', rev_id='2')
 
2146
        tree.branch.repository.copy_content_into(shared_repo)
 
2147
        dir = self.make_bzrdir('shared/source')
 
2148
        dir.create_branch()
 
2149
        target = dir.sprout(self.get_url('shared/target'))
 
2150
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2151
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
2152
        self.assertTrue(shared_repo.has_revision('1'))
 
2153
 
 
2154
    def test_sprout_bzrdir_repository_branch_only_source_under_shared(self):
 
2155
        try:
 
2156
            shared_repo = self.make_repository('shared', shared=True)
 
2157
        except errors.IncompatibleFormat:
 
2158
            return
 
2159
        tree = self.make_branch_and_tree('commit_tree')
 
2160
        self.build_tree(['commit_tree/foo'])
 
2161
        tree.add('foo')
 
2162
        tree.commit('revision 1', rev_id='1')
 
2163
        tree.bzrdir.open_branch().generate_revision_history(
 
2164
            bzrlib.revision.NULL_REVISION)
 
2165
        tree.set_parent_trees([])
 
2166
        tree.commit('revision 2', rev_id='2')
 
2167
        tree.branch.repository.copy_content_into(shared_repo)
 
2168
        if shared_repo.make_working_trees():
 
2169
            shared_repo.set_make_working_trees(False)
 
2170
            self.assertFalse(shared_repo.make_working_trees())
 
2171
        self.assertTrue(shared_repo.has_revision('1'))
 
2172
        dir = self.make_bzrdir('shared/source')
 
2173
        dir.create_branch()
 
2174
        target = dir.sprout(self.get_url('target'))
 
2175
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2176
        self.assertNotEqual(dir.transport.base, shared_repo.bzrdir.transport.base)
 
2177
        branch = target.open_branch()
 
2178
        # The sprouted bzrdir has a branch, so only revisions referenced by
 
2179
        # that branch are copied, rather than the whole repository.  It's an
 
2180
        # empty branch, so none are copied.
 
2181
        self.assertEqual([], branch.repository.all_revision_ids())
 
2182
        if branch.bzrdir._format.supports_workingtrees:
 
2183
            self.assertTrue(branch.repository.make_working_trees())
 
2184
        self.assertFalse(branch.repository.is_shared())
 
2185
 
 
2186
    def test_sprout_bzrdir_repository_under_shared_force_new_repo(self):
 
2187
        tree = self.make_branch_and_tree('commit_tree')
 
2188
        self.build_tree(['commit_tree/foo'])
 
2189
        tree.add('foo')
 
2190
        tree.commit('revision 1', rev_id='1')
 
2191
        tree.bzrdir.open_branch().generate_revision_history(
 
2192
            bzrlib.revision.NULL_REVISION)
 
2193
        tree.set_parent_trees([])
 
2194
        tree.commit('revision 2', rev_id='2')
 
2195
        source = self.make_repository('source')
 
2196
        tree.branch.repository.copy_content_into(source)
 
2197
        dir = source.bzrdir
 
2198
        try:
 
2199
            shared_repo = self.make_repository('target', shared=True)
 
2200
        except errors.IncompatibleFormat:
 
2201
            return
 
2202
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
2203
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2204
        self.assertFalse(shared_repo.has_revision('1'))
 
2205
 
 
2206
    def test_sprout_bzrdir_repository_revision(self):
 
2207
        # test for revision limiting, [smoke test, not corner case checks].
 
2208
        # make a repository with some revisions,
 
2209
        # and sprout it with a revision limit.
 
2210
        #
 
2211
        tree = self.make_branch_and_tree('commit_tree')
 
2212
        self.build_tree(['commit_tree/foo'])
 
2213
        tree.add('foo')
 
2214
        tree.commit('revision 1', rev_id='1')
 
2215
        br = tree.bzrdir.open_branch()
 
2216
        br.set_last_revision_info(0, bzrlib.revision.NULL_REVISION)
 
2217
        tree.set_parent_trees([])
 
2218
        tree.commit('revision 2', rev_id='2')
 
2219
        source = self.make_repository('source')
 
2220
        tree.branch.repository.copy_content_into(source)
 
2221
        dir = source.bzrdir
 
2222
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='2')
 
2223
        raise TestSkipped('revision limiting not strict yet')
 
2224
 
 
2225
    def test_sprout_bzrdir_branch_and_repo_shared(self):
 
2226
        # sprouting a branch with a repo into a shared repo uses the shared
 
2227
        # repo
 
2228
        tree = self.make_branch_and_tree('commit_tree')
 
2229
        self.build_tree(['commit_tree/foo'])
 
2230
        tree.add('foo')
 
2231
        tree.commit('revision 1', rev_id='1')
 
2232
        source = self.make_branch('source')
 
2233
        tree.branch.repository.copy_content_into(source.repository)
 
2234
        tree.bzrdir.open_branch().copy_content_into(source)
 
2235
        dir = source.bzrdir
 
2236
        try:
 
2237
            shared_repo = self.make_repository('target', shared=True)
 
2238
        except errors.IncompatibleFormat:
 
2239
            return
 
2240
        target = dir.sprout(self.get_url('target/child'))
 
2241
        self.assertTrue(shared_repo.has_revision('1'))
 
2242
 
 
2243
    def test_sprout_bzrdir_branch_and_repo_shared_force_new_repo(self):
 
2244
        # sprouting a branch with a repo into a shared repo uses the shared
 
2245
        # repo
 
2246
        tree = self.make_branch_and_tree('commit_tree')
 
2247
        self.build_tree(['commit_tree/foo'])
 
2248
        tree.add('foo')
 
2249
        tree.commit('revision 1', rev_id='1')
 
2250
        source = self.make_branch('source')
 
2251
        tree.branch.repository.copy_content_into(source.repository)
 
2252
        tree.bzrdir.open_branch().copy_content_into(source)
 
2253
        dir = source.bzrdir
 
2254
        try:
 
2255
            shared_repo = self.make_repository('target', shared=True)
 
2256
        except errors.IncompatibleFormat:
 
2257
            return
 
2258
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
2259
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2260
        self.assertFalse(shared_repo.has_revision('1'))
 
2261
 
 
2262
    def test_sprout_bzrdir_branch_reference(self):
 
2263
        # sprouting should create a repository if needed and a sprouted branch.
 
2264
        referenced_branch = self.make_branch('referenced')
 
2265
        dir = self.make_bzrdir('source')
 
2266
        try:
 
2267
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
2268
                target_branch=referenced_branch)
 
2269
        except errors.IncompatibleFormat:
 
2270
            # this is ok too, not all formats have to support references.
 
2271
            return
 
2272
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
2273
        target = dir.sprout(self.get_url('target'))
 
2274
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2275
        # we want target to have a branch that is in-place.
 
2276
        self.assertEqual(target, target.open_branch().bzrdir)
 
2277
        # and as we dont support repositories being detached yet, a repo in
 
2278
        # place
 
2279
        target.open_repository()
 
2280
 
 
2281
    def test_sprout_bzrdir_branch_reference_shared(self):
 
2282
        # sprouting should create a repository if needed and a sprouted branch.
 
2283
        referenced_tree = self.make_branch_and_tree('referenced')
 
2284
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
 
2285
        dir = self.make_bzrdir('source')
 
2286
        try:
 
2287
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
2288
                target_branch=referenced_tree.branch)
 
2289
        except errors.IncompatibleFormat:
 
2290
            # this is ok too, not all formats have to support references.
 
2291
            return
 
2292
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
2293
        try:
 
2294
            shared_repo = self.make_repository('target', shared=True)
 
2295
        except errors.IncompatibleFormat:
 
2296
            return
 
2297
        target = dir.sprout(self.get_url('target/child'))
 
2298
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2299
        # we want target to have a branch that is in-place.
 
2300
        self.assertEqual(target, target.open_branch().bzrdir)
 
2301
        # and we want no repository as the target is shared
 
2302
        self.assertRaises(errors.NoRepositoryPresent,
 
2303
                          target.open_repository)
 
2304
        # and we want revision '1' in the shared repo
 
2305
        self.assertTrue(shared_repo.has_revision('1'))
 
2306
 
 
2307
    def test_sprout_bzrdir_branch_reference_shared_force_new_repo(self):
 
2308
        # sprouting should create a repository if needed and a sprouted branch.
 
2309
        referenced_tree = self.make_branch_and_tree('referenced')
 
2310
        referenced_tree.commit('1', rev_id='1', allow_pointless=True)
 
2311
        dir = self.make_bzrdir('source')
 
2312
        try:
 
2313
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
2314
                target_branch=referenced_tree.branch)
 
2315
        except errors.IncompatibleFormat:
 
2316
            # this is ok too, not all formats have to support references.
 
2317
            return
 
2318
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
2319
        try:
 
2320
            shared_repo = self.make_repository('target', shared=True)
 
2321
        except errors.IncompatibleFormat:
 
2322
            return
 
2323
        target = dir.sprout(self.get_url('target/child'), force_new_repo=True)
 
2324
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2325
        # we want target to have a branch that is in-place.
 
2326
        self.assertEqual(target, target.open_branch().bzrdir)
 
2327
        # and we want revision '1' in the new repo
 
2328
        self.assertTrue(target.open_repository().has_revision('1'))
 
2329
        # but not the shared one
 
2330
        self.assertFalse(shared_repo.has_revision('1'))
 
2331
 
 
2332
    def test_sprout_bzrdir_branch_revision(self):
 
2333
        # test for revision limiting, [smoke test, not corner case checks].
 
2334
        # make a repository with some revisions,
 
2335
        # and sprout it with a revision limit.
 
2336
        #
 
2337
        tree = self.make_branch_and_tree('commit_tree')
 
2338
        self.build_tree(['commit_tree/foo'])
 
2339
        tree.add('foo')
 
2340
        tree.commit('revision 1', rev_id='1')
 
2341
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
2342
        source = self.make_branch('source')
 
2343
        tree.branch.repository.copy_content_into(source.repository)
 
2344
        tree.bzrdir.open_branch().copy_content_into(source)
 
2345
        dir = source.bzrdir
 
2346
        target = dir.sprout(self.get_url('target'), revision_id='1')
 
2347
        self.assertEqual('1', target.open_branch().last_revision())
 
2348
 
 
2349
    def test_sprout_bzrdir_branch_with_tags(self):
 
2350
        # when sprouting a branch all revisions named in the tags are copied
 
2351
        # too.
 
2352
        builder = self.make_branch_builder('source')
 
2353
        source = fixtures.build_branch_with_non_ancestral_rev(builder)
 
2354
        try:
 
2355
            source.tags.set_tag('tag-a', 'rev-2')
 
2356
        except errors.TagsNotSupported:
 
2357
            raise TestNotApplicable('Branch format does not support tags.')
 
2358
        source.get_config().set_user_option('branch.fetch_tags', 'True')
 
2359
        # Now source has a tag not in its ancestry.  Sprout its controldir.
 
2360
        dir = source.bzrdir
 
2361
        target = dir.sprout(self.get_url('target'))
 
2362
        # The tag is present, and so is its revision.
 
2363
        new_branch = target.open_branch()
 
2364
        self.assertEqual('rev-2', new_branch.tags.lookup_tag('tag-a'))
 
2365
        new_branch.repository.get_revision('rev-2')
 
2366
 
 
2367
    def test_sprout_bzrdir_branch_with_absent_tag(self):
 
2368
        # tags referencing absent revisions are copied (and those absent
 
2369
        # revisions do not prevent the sprout.)
 
2370
        builder = self.make_branch_builder('source')
 
2371
        builder.build_commit(message="Rev 1", rev_id='rev-1')
 
2372
        source = builder.get_branch()
 
2373
        try:
 
2374
            source.tags.set_tag('tag-a', 'missing-rev')
 
2375
        except errors.TagsNotSupported:
 
2376
            raise TestNotApplicable('Branch format does not support tags.')
 
2377
        # Now source has a tag pointing to an absent revision.  Sprout its
 
2378
        # controldir.
 
2379
        dir = source.bzrdir
 
2380
        target = dir.sprout(self.get_url('target'))
 
2381
        # The tag is present in the target
 
2382
        new_branch = target.open_branch()
 
2383
        self.assertEqual('missing-rev', new_branch.tags.lookup_tag('tag-a'))
 
2384
 
 
2385
    def test_sprout_bzrdir_passing_source_branch_with_absent_tag(self):
 
2386
        # tags referencing absent revisions are copied (and those absent
 
2387
        # revisions do not prevent the sprout.)
 
2388
        builder = self.make_branch_builder('source')
 
2389
        builder.build_commit(message="Rev 1", rev_id='rev-1')
 
2390
        source = builder.get_branch()
 
2391
        try:
 
2392
            source.tags.set_tag('tag-a', 'missing-rev')
 
2393
        except errors.TagsNotSupported:
 
2394
            raise TestNotApplicable('Branch format does not support tags.')
 
2395
        # Now source has a tag pointing to an absent revision.  Sprout its
 
2396
        # controldir.
 
2397
        dir = source.bzrdir
 
2398
        target = dir.sprout(self.get_url('target'), source_branch=source)
 
2399
        # The tag is present in the target
 
2400
        new_branch = target.open_branch()
 
2401
        self.assertEqual('missing-rev', new_branch.tags.lookup_tag('tag-a'))
 
2402
 
 
2403
    def test_sprout_bzrdir_passing_rev_not_source_branch_copies_tags(self):
 
2404
        # dir.sprout(..., revision_id='rev1') copies rev1, and all the tags of
 
2405
        # the branch at that bzrdir, the ancestry of all of those, but no other
 
2406
        # revs (not even the tip of the source branch).
 
2407
        builder = self.make_branch_builder('source')
 
2408
        builder.build_commit(message="Base", rev_id='base-rev')
 
2409
        # Make three parallel lines of ancestry off this base.
 
2410
        source = builder.get_branch()
 
2411
        builder.build_commit(message="Rev A1", rev_id='rev-a1')
 
2412
        builder.build_commit(message="Rev A2", rev_id='rev-a2')
 
2413
        builder.build_commit(message="Rev A3", rev_id='rev-a3')
 
2414
        source.set_last_revision_info(1, 'base-rev')
 
2415
        builder.build_commit(message="Rev B1", rev_id='rev-b1')
 
2416
        builder.build_commit(message="Rev B2", rev_id='rev-b2')
 
2417
        builder.build_commit(message="Rev B3", rev_id='rev-b3')
 
2418
        source.set_last_revision_info(1, 'base-rev')
 
2419
        builder.build_commit(message="Rev C1", rev_id='rev-c1')
 
2420
        builder.build_commit(message="Rev C2", rev_id='rev-c2')
 
2421
        builder.build_commit(message="Rev C3", rev_id='rev-c3')
 
2422
        # Set the branch tip to A2
 
2423
        source.set_last_revision_info(3, 'rev-a2')
 
2424
        try:
 
2425
            # Create a tag for B2, and for an absent rev
 
2426
            source.tags.set_tag('tag-non-ancestry', 'rev-b2')
 
2427
            source.tags.set_tag('tag-absent', 'absent-rev')
 
2428
        except errors.TagsNotSupported:
 
2429
            raise TestNotApplicable('Branch format does not support tags.')
 
2430
        source.get_config().set_user_option('branch.fetch_tags', 'True')
 
2431
        # And ask sprout for C2
 
2432
        dir = source.bzrdir
 
2433
        target = dir.sprout(self.get_url('target'), revision_id='rev-c2')
 
2434
        # The tags are present
 
2435
        new_branch = target.open_branch()
 
2436
        self.assertEqual(
 
2437
            {'tag-absent': 'absent-rev', 'tag-non-ancestry': 'rev-b2'},
 
2438
            new_branch.tags.get_tag_dict())
 
2439
        # And the revs for A2, B2 and C2's ancestries are present, but no
 
2440
        # others.
 
2441
        self.assertEqual(
 
2442
            ['base-rev', 'rev-b1', 'rev-b2', 'rev-c1', 'rev-c2'],
 
2443
            sorted(new_branch.repository.all_revision_ids()))
 
2444
 
 
2445
    def test_sprout_bzrdir_tree_branch_reference(self):
 
2446
        # sprouting should create a repository if needed and a sprouted branch.
 
2447
        # the tree state should not be copied.
 
2448
        referenced_branch = self.make_branch('referencced')
 
2449
        dir = self.make_bzrdir('source')
 
2450
        try:
 
2451
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
2452
                target_branch=referenced_branch)
 
2453
        except errors.IncompatibleFormat:
 
2454
            # this is ok too, not all formats have to support references.
 
2455
            return
 
2456
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
2457
        tree = self.createWorkingTreeOrSkip(dir)
 
2458
        self.build_tree(['source/subdir/'])
 
2459
        tree.add('subdir')
 
2460
        target = dir.sprout(self.get_url('target'))
 
2461
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2462
        # we want target to have a branch that is in-place.
 
2463
        self.assertEqual(target, target.open_branch().bzrdir)
 
2464
        # and as we dont support repositories being detached yet, a repo in
 
2465
        # place
 
2466
        target.open_repository()
 
2467
        result_tree = target.open_workingtree()
 
2468
        self.assertFalse(result_tree.has_filename('subdir'))
 
2469
 
 
2470
    def test_sprout_bzrdir_tree_branch_reference_revision(self):
 
2471
        # sprouting should create a repository if needed and a sprouted branch.
 
2472
        # the tree state should not be copied but the revision changed,
 
2473
        # and the likewise the new branch should be truncated too
 
2474
        referenced_branch = self.make_branch('referencced')
 
2475
        dir = self.make_bzrdir('source')
 
2476
        try:
 
2477
            reference = bzrlib.branch.BranchReferenceFormat().initialize(dir,
 
2478
                target_branch=referenced_branch)
 
2479
        except errors.IncompatibleFormat:
 
2480
            # this is ok too, not all formats have to support references.
 
2481
            return
 
2482
        self.assertRaises(errors.NoRepositoryPresent, dir.open_repository)
 
2483
        tree = self.createWorkingTreeOrSkip(dir)
 
2484
        self.build_tree(['source/foo'])
 
2485
        tree.add('foo')
 
2486
        tree.commit('revision 1', rev_id='1')
 
2487
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
2488
        target = dir.sprout(self.get_url('target'), revision_id='1')
 
2489
        self.skipIfNoWorkingTree(target)
 
2490
        self.assertNotEqual(dir.transport.base, target.transport.base)
 
2491
        # we want target to have a branch that is in-place.
 
2492
        self.assertEqual(target, target.open_branch().bzrdir)
 
2493
        # and as we dont support repositories being detached yet, a repo in
 
2494
        # place
 
2495
        target.open_repository()
 
2496
        # we trust that the working tree sprouting works via the other tests.
 
2497
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
2498
        self.assertEqual('1', target.open_branch().last_revision())
 
2499
 
 
2500
    def test_sprout_bzrdir_tree_revision(self):
 
2501
        # test for revision limiting, [smoke test, not corner case checks].
 
2502
        # make a tree with a revision with a last-revision
 
2503
        # and sprout it with a revision limit.
 
2504
        # This smoke test just checks the revision-id is right. Tree specific
 
2505
        # tests will check corner cases.
 
2506
        tree = self.make_branch_and_tree('source')
 
2507
        self.build_tree(['source/foo'])
 
2508
        tree.add('foo')
 
2509
        tree.commit('revision 1', rev_id='1')
 
2510
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
2511
        dir = tree.bzrdir
 
2512
        target = self.sproutOrSkip(dir, self.get_url('target'), revision_id='1')
 
2513
        self.assertEqual(['1'], target.open_workingtree().get_parent_ids())
 
2514
 
 
2515
    def test_sprout_takes_accelerator(self):
 
2516
        tree = self.make_branch_and_tree('source')
 
2517
        self.build_tree(['source/foo'])
 
2518
        tree.add('foo')
 
2519
        tree.commit('revision 1', rev_id='1')
 
2520
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
2521
        dir = tree.bzrdir
 
2522
        target = self.sproutOrSkip(dir, self.get_url('target'),
 
2523
                                   accelerator_tree=tree)
 
2524
        self.assertEqual(['2'], target.open_workingtree().get_parent_ids())
 
2525
 
 
2526
    def test_sprout_branch_no_tree(self):
 
2527
        tree = self.make_branch_and_tree('source')
 
2528
        self.build_tree(['source/foo'])
 
2529
        tree.add('foo')
 
2530
        tree.commit('revision 1', rev_id='1')
 
2531
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
2532
        dir = tree.bzrdir
 
2533
        try:
 
2534
            target = dir.sprout(self.get_url('target'),
 
2535
                create_tree_if_local=False)
 
2536
        except errors.MustHaveWorkingTree:
 
2537
            raise TestNotApplicable("control dir format requires working tree")
 
2538
        self.assertPathDoesNotExist('target/foo')
 
2539
        self.assertEqual(tree.branch.last_revision(),
 
2540
                         target.open_branch().last_revision())
 
2541
 
 
2542
    def test_sprout_with_revision_id_uses_default_stack_on(self):
 
2543
        # Make a branch with three commits to stack on.
 
2544
        builder = self.make_branch_builder('stack-on')
 
2545
        builder.start_series()
 
2546
        builder.build_commit(message='Rev 1.', rev_id='rev-1')
 
2547
        builder.build_commit(message='Rev 2.', rev_id='rev-2')
 
2548
        builder.build_commit(message='Rev 3.', rev_id='rev-3')
 
2549
        builder.finish_series()
 
2550
        stack_on = builder.get_branch()
 
2551
        # Make a bzrdir with a default stacking policy to stack on that branch.
 
2552
        config = self.make_bzrdir('policy-dir').get_config()
 
2553
        try:
 
2554
            config.set_default_stack_on(self.get_url('stack-on'))
 
2555
        except errors.BzrError:
 
2556
            raise TestNotApplicable('Only relevant for stackable formats.')
 
2557
        # Sprout the stacked-on branch into the bzrdir.
 
2558
        sprouted = stack_on.bzrdir.sprout(
 
2559
            self.get_url('policy-dir/sprouted'), revision_id='rev-3')
 
2560
        # Not all revisions are copied into the sprouted repository.
 
2561
        repo = sprouted.open_repository()
 
2562
        self.addCleanup(repo.lock_read().unlock)
 
2563
        self.assertEqual(None, repo.get_parent_map(['rev-1']).get('rev-1'))
 
2564
 
 
2565
    def test_format_initialize_find_open(self):
 
2566
        # loopback test to check the current format initializes to itself.
 
2567
        if not self.bzrdir_format.is_supported():
 
2568
            # unsupported formats are not loopback testable
 
2569
            # because the default open will not open them and
 
2570
            # they may not be initializable.
 
2571
            return
 
2572
        # for remote formats, there must be no prior assumption about the
 
2573
        # network name to use - it's possible that this may somehow have got
 
2574
        # in through an unisolated test though - see
 
2575
        # <https://bugs.launchpad.net/bzr/+bug/504102>
 
2576
        self.assertEquals(getattr(self.bzrdir_format,
 
2577
            '_network_name', None),
 
2578
            None)
 
2579
        # supported formats must be able to init and open
 
2580
        t = self.get_transport()
 
2581
        readonly_t = self.get_readonly_transport()
 
2582
        made_control = self.bzrdir_format.initialize(t.base)
 
2583
        self.assertIsInstance(made_control, controldir.ControlDir)
 
2584
        if isinstance(self.bzrdir_format, RemoteBzrDirFormat):
 
2585
            return
 
2586
        self.assertEqual(self.bzrdir_format,
 
2587
                         controldir.ControlDirFormat.find_format(readonly_t))
 
2588
        direct_opened_dir = self.bzrdir_format.open(readonly_t)
 
2589
        opened_dir = bzrdir.BzrDir.open(t.base)
 
2590
        self.assertEqual(made_control._format,
 
2591
                         opened_dir._format)
 
2592
        self.assertEqual(direct_opened_dir._format,
 
2593
                         opened_dir._format)
 
2594
        self.assertIsInstance(opened_dir, controldir.ControlDir)
 
2595
 
 
2596
    def test_format_initialize_on_transport_ex(self):
 
2597
        t = self.get_transport('dir')
 
2598
        self.assertInitializeEx(t)
 
2599
 
 
2600
    def test_format_initialize_on_transport_ex_use_existing_dir_True(self):
 
2601
        t = self.get_transport('dir')
 
2602
        t.ensure_base()
 
2603
        self.assertInitializeEx(t, use_existing_dir=True)
 
2604
 
 
2605
    def test_format_initialize_on_transport_ex_use_existing_dir_False(self):
 
2606
        if not self.bzrdir_format.is_supported():
 
2607
            # Not initializable - not a failure either.
 
2608
            return
 
2609
        t = self.get_transport('dir')
 
2610
        t.ensure_base()
 
2611
        self.assertRaises(errors.FileExists,
 
2612
            self.bzrdir_format.initialize_on_transport_ex, t,
 
2613
            use_existing_dir=False)
 
2614
 
 
2615
    def test_format_initialize_on_transport_ex_create_prefix_True(self):
 
2616
        t = self.get_transport('missing/dir')
 
2617
        self.assertInitializeEx(t, create_prefix=True)
 
2618
 
 
2619
    def test_format_initialize_on_transport_ex_create_prefix_False(self):
 
2620
        if not self.bzrdir_format.is_supported():
 
2621
            # Not initializable - not a failure either.
 
2622
            return
 
2623
        t = self.get_transport('missing/dir')
 
2624
        self.assertRaises(errors.NoSuchFile, self.assertInitializeEx, t,
 
2625
            create_prefix=False)
 
2626
 
 
2627
    def test_format_initialize_on_transport_ex_force_new_repo_True(self):
 
2628
        t = self.get_transport('repo')
 
2629
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
2630
        repo_name = repo_fmt.repository_format.network_name()
 
2631
        repo = repo_fmt.initialize_on_transport_ex(t,
 
2632
            repo_format_name=repo_name, shared_repo=True)[0]
 
2633
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
 
2634
            force_new_repo=True, repo_format_name=repo_name)
 
2635
        if control is None:
 
2636
            # uninitialisable format
 
2637
            return
 
2638
        self.assertNotEqual(repo.bzrdir.root_transport.base,
 
2639
            made_repo.bzrdir.root_transport.base)
 
2640
 
 
2641
    def test_format_initialize_on_transport_ex_force_new_repo_False(self):
 
2642
        t = self.get_transport('repo')
 
2643
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
2644
        repo_name = repo_fmt.repository_format.network_name()
 
2645
        repo = repo_fmt.initialize_on_transport_ex(t,
 
2646
            repo_format_name=repo_name, shared_repo=True)[0]
 
2647
        made_repo, control = self.assertInitializeEx(t.clone('branch'),
 
2648
            force_new_repo=False, repo_format_name=repo_name)
 
2649
        if control is None:
 
2650
            # uninitialisable format
 
2651
            return
 
2652
        if not control._format.fixed_components:
 
2653
            self.assertEqual(repo.bzrdir.root_transport.base,
 
2654
                made_repo.bzrdir.root_transport.base)
 
2655
 
 
2656
    def test_format_initialize_on_transport_ex_stacked_on(self):
 
2657
        # trunk is a stackable format.  Note that its in the same server area
 
2658
        # which is what launchpad does, but not sufficient to exercise the
 
2659
        # general case.
 
2660
        trunk = self.make_branch('trunk', format='1.9')
 
2661
        t = self.get_transport('stacked')
 
2662
        old_fmt = bzrdir.format_registry.make_bzrdir('pack-0.92')
 
2663
        repo_name = old_fmt.repository_format.network_name()
 
2664
        # Should end up with a 1.9 format (stackable)
 
2665
        repo, control = self.assertInitializeEx(t, need_meta=True,
 
2666
            repo_format_name=repo_name, stacked_on='../trunk',
 
2667
            stack_on_pwd=t.base)
 
2668
        if control is None:
 
2669
            # uninitialisable format
 
2670
            return
 
2671
        self.assertLength(1, repo._fallback_repositories)
 
2672
 
 
2673
    def test_format_initialize_on_transport_ex_default_stack_on(self):
 
2674
        # When initialize_on_transport_ex uses a stacked-on branch because of
 
2675
        # a stacking policy on the target, the location of the fallback
 
2676
        # repository is the same as the external location of the stacked-on
 
2677
        # branch.
 
2678
        balloon = self.make_bzrdir('balloon')
 
2679
        if isinstance(balloon._format, bzrdir.BzrDirMetaFormat1):
 
2680
            stack_on = self.make_branch('stack-on', format='1.9')
 
2681
        else:
 
2682
            stack_on = self.make_branch('stack-on')
 
2683
        config = self.make_bzrdir('.').get_config()
 
2684
        try:
 
2685
            config.set_default_stack_on('stack-on')
 
2686
        except errors.BzrError:
 
2687
            raise TestNotApplicable('Only relevant for stackable formats.')
 
2688
        # Initialize a bzrdir subject to the policy.
 
2689
        t = self.get_transport('stacked')
 
2690
        repo_fmt = bzrdir.format_registry.make_bzrdir('1.9')
 
2691
        repo_name = repo_fmt.repository_format.network_name()
 
2692
        repo, control = self.assertInitializeEx(
 
2693
            t, need_meta=True, repo_format_name=repo_name, stacked_on=None)
 
2694
        # self.addCleanup(repo.unlock)
 
2695
        if control is None:
 
2696
            # uninitialisable format
 
2697
            return
 
2698
        # There's one fallback repo, with a public location.
 
2699
        self.assertLength(1, repo._fallback_repositories)
 
2700
        fallback_repo = repo._fallback_repositories[0]
 
2701
        self.assertEqual(
 
2702
            stack_on.base, fallback_repo.bzrdir.root_transport.base)
 
2703
        # The bzrdir creates a branch in stacking-capable format.
 
2704
        new_branch = control.create_branch()
 
2705
        self.assertTrue(new_branch._format.supports_stacking())
 
2706
 
 
2707
    def test_format_initialize_on_transport_ex_repo_fmt_name_None(self):
 
2708
        t = self.get_transport('dir')
 
2709
        repo, control = self.assertInitializeEx(t)
 
2710
        self.assertEqual(None, repo)
 
2711
 
 
2712
    def test_format_initialize_on_transport_ex_repo_fmt_name_followed(self):
 
2713
        t = self.get_transport('dir')
 
2714
        # 1.6 is likely to never be default
 
2715
        fmt = bzrdir.format_registry.make_bzrdir('1.6')
 
2716
        repo_name = fmt.repository_format.network_name()
 
2717
        repo, control = self.assertInitializeEx(t, repo_format_name=repo_name)
 
2718
        if control is None:
 
2719
            # uninitialisable format
 
2720
            return
 
2721
        if self.bzrdir_format.fixed_components:
 
2722
            # must stay with the all-in-one-format.
 
2723
            repo_name = self.bzrdir_format.network_name()
 
2724
        self.assertEqual(repo_name, repo._format.network_name())
 
2725
 
 
2726
    def assertInitializeEx(self, t, need_meta=False, **kwargs):
 
2727
        """Execute initialize_on_transport_ex and check it succeeded correctly.
 
2728
 
 
2729
        This involves checking that the disk objects were created, open with
 
2730
        the same format returned, and had the expected disk format.
 
2731
 
 
2732
        :param t: The transport to initialize on.
 
2733
        :param **kwargs: Additional arguments to pass to
 
2734
            initialize_on_transport_ex.
 
2735
        :return: the resulting repo, control dir tuple.
 
2736
        """
 
2737
        if not self.bzrdir_format.is_supported():
 
2738
            # Not initializable - not a failure either.
 
2739
            return None, None
 
2740
        repo, control, require_stacking, repo_policy = \
 
2741
            self.bzrdir_format.initialize_on_transport_ex(t, **kwargs)
 
2742
        if repo is not None:
 
2743
            # Repositories are open write-locked
 
2744
            self.assertTrue(repo.is_write_locked())
 
2745
            self.addCleanup(repo.unlock)
 
2746
        self.assertIsInstance(control, controldir.ControlDir)
 
2747
        opened = bzrdir.BzrDir.open(t.base)
 
2748
        expected_format = self.bzrdir_format
 
2749
        if need_meta and expected_format.fixed_components:
 
2750
            # Pre-metadir formats change when we are making something that
 
2751
            # needs a metaformat, because clone is used for push.
 
2752
            expected_format = bzrdir.BzrDirMetaFormat1()
 
2753
        if not isinstance(expected_format, RemoteBzrDirFormat):
 
2754
            self.assertEqual(control._format.network_name(),
 
2755
                expected_format.network_name())
 
2756
            self.assertEqual(control._format.network_name(),
 
2757
                opened._format.network_name())
 
2758
        self.assertEqual(control.__class__, opened.__class__)
 
2759
        return repo, control
 
2760
 
 
2761
    def test_format_network_name(self):
 
2762
        # All control formats must have a network name.
 
2763
        dir = self.make_bzrdir('.')
 
2764
        format = dir._format
 
2765
        # We want to test that the network_name matches the actual format on
 
2766
        # disk. For local control dirsthat means that using network_name as a
 
2767
        # key in the registry gives back the same format. For remote obects
 
2768
        # we check that the network_name of the RemoteBzrDirFormat we have
 
2769
        # locally matches the actual format present on disk.
 
2770
        if isinstance(format, RemoteBzrDirFormat):
 
2771
            dir._ensure_real()
 
2772
            real_dir = dir._real_bzrdir
 
2773
            network_name = format.network_name()
 
2774
            self.assertEqual(real_dir._format.network_name(), network_name)
 
2775
        else:
 
2776
            registry = controldir.network_format_registry
 
2777
            network_name = format.network_name()
 
2778
            looked_up_format = registry.get(network_name)
 
2779
            self.assertTrue(
 
2780
                issubclass(format.__class__, looked_up_format.__class__))
 
2781
        # The network name must be a byte string.
 
2782
        self.assertIsInstance(network_name, str)
 
2783
 
 
2784
    def test_open_not_bzrdir(self):
 
2785
        # test the formats specific behaviour for no-content or similar dirs.
 
2786
        self.assertRaises(errors.NotBranchError,
 
2787
                          self.bzrdir_format.open,
 
2788
                          transport.get_transport(self.get_readonly_url()))
 
2789
 
 
2790
    def test_create_branch(self):
 
2791
        # a bzrdir can construct a branch and repository for itself.
 
2792
        if not self.bzrdir_format.is_supported():
 
2793
            # unsupported formats are not loopback testable
 
2794
            # because the default open will not open them and
 
2795
            # they may not be initializable.
 
2796
            return
 
2797
        t = self.get_transport()
 
2798
        made_control = self.bzrdir_format.initialize(t.base)
 
2799
        made_repo = made_control.create_repository()
 
2800
        made_branch = made_control.create_branch()
 
2801
        self.assertIsInstance(made_branch, bzrlib.branch.Branch)
 
2802
        self.assertEqual(made_control, made_branch.bzrdir)
 
2803
 
 
2804
    def test_open_branch(self):
 
2805
        if not self.bzrdir_format.is_supported():
 
2806
            # unsupported formats are not loopback testable
 
2807
            # because the default open will not open them and
 
2808
            # they may not be initializable.
 
2809
            return
 
2810
        t = self.get_transport()
 
2811
        made_control = self.bzrdir_format.initialize(t.base)
 
2812
        made_repo = made_control.create_repository()
 
2813
        made_branch = made_control.create_branch()
 
2814
        opened_branch = made_control.open_branch()
 
2815
        self.assertEqual(made_control, opened_branch.bzrdir)
 
2816
        self.assertIsInstance(opened_branch, made_branch.__class__)
 
2817
        self.assertIsInstance(opened_branch._format, made_branch._format.__class__)
 
2818
 
 
2819
    def test_list_branches(self):
 
2820
        if not self.bzrdir_format.is_supported():
 
2821
            # unsupported formats are not loopback testable
 
2822
            # because the default open will not open them and
 
2823
            # they may not be initializable.
 
2824
            return
 
2825
        t = self.get_transport()
 
2826
        made_control = self.bzrdir_format.initialize(t.base)
 
2827
        made_repo = made_control.create_repository()
 
2828
        made_branch = made_control.create_branch()
 
2829
        branches = made_control.list_branches()
 
2830
        self.assertEquals(1, len(branches))
 
2831
        self.assertEquals(made_branch.base, branches[0].base)
 
2832
        try:
 
2833
            made_control.destroy_branch()
 
2834
        except errors.UnsupportedOperation:
 
2835
            pass # Not all bzrdirs support destroying directories
 
2836
        else:
 
2837
            self.assertEquals([], made_control.list_branches())
 
2838
 
 
2839
    def test_create_repository(self):
 
2840
        # a bzrdir can construct a repository for itself.
 
2841
        if not self.bzrdir_format.is_supported():
 
2842
            # unsupported formats are not loopback testable
 
2843
            # because the default open will not open them and
 
2844
            # they may not be initializable.
 
2845
            return
 
2846
        t = self.get_transport()
 
2847
        made_control = self.bzrdir_format.initialize(t.base)
 
2848
        made_repo = made_control.create_repository()
 
2849
        # Check that we have a repository object.
 
2850
        made_repo.has_revision('foo')
 
2851
        self.assertEqual(made_control, made_repo.bzrdir)
 
2852
 
 
2853
    def test_create_repository_shared(self):
 
2854
        # a bzrdir can create a shared repository or
 
2855
        # fail appropriately
 
2856
        if not self.bzrdir_format.is_supported():
 
2857
            # unsupported formats are not loopback testable
 
2858
            # because the default open will not open them and
 
2859
            # they may not be initializable.
 
2860
            return
 
2861
        t = self.get_transport()
 
2862
        made_control = self.bzrdir_format.initialize(t.base)
 
2863
        try:
 
2864
            made_repo = made_control.create_repository(shared=True)
 
2865
        except errors.IncompatibleFormat:
 
2866
            # Old bzrdir formats don't support shared repositories
 
2867
            # and should raise IncompatibleFormat
 
2868
            return
 
2869
        self.assertTrue(made_repo.is_shared())
 
2870
 
 
2871
    def test_create_repository_nonshared(self):
 
2872
        # a bzrdir can create a non-shared repository
 
2873
        if not self.bzrdir_format.is_supported():
 
2874
            # unsupported formats are not loopback testable
 
2875
            # because the default open will not open them and
 
2876
            # they may not be initializable.
 
2877
            return
 
2878
        t = self.get_transport()
 
2879
        made_control = self.bzrdir_format.initialize(t.base)
 
2880
        made_repo = made_control.create_repository(shared=False)
 
2881
        self.assertFalse(made_repo.is_shared())
 
2882
 
 
2883
    def test_open_repository(self):
 
2884
        if not self.bzrdir_format.is_supported():
 
2885
            # unsupported formats are not loopback testable
 
2886
            # because the default open will not open them and
 
2887
            # they may not be initializable.
 
2888
            return
 
2889
        t = self.get_transport()
 
2890
        made_control = self.bzrdir_format.initialize(t.base)
 
2891
        made_repo = made_control.create_repository()
 
2892
        opened_repo = made_control.open_repository()
 
2893
        self.assertEqual(made_control, opened_repo.bzrdir)
 
2894
        self.assertIsInstance(opened_repo, made_repo.__class__)
 
2895
        self.assertIsInstance(opened_repo._format, made_repo._format.__class__)
 
2896
 
 
2897
    def test_create_workingtree(self):
 
2898
        # a bzrdir can construct a working tree for itself.
 
2899
        if not self.bzrdir_format.is_supported():
 
2900
            # unsupported formats are not loopback testable
 
2901
            # because the default open will not open them and
 
2902
            # they may not be initializable.
 
2903
            return
 
2904
        t = self.get_transport()
 
2905
        made_control = self.bzrdir_format.initialize(t.base)
 
2906
        made_repo = made_control.create_repository()
 
2907
        made_branch = made_control.create_branch()
 
2908
        made_tree = self.createWorkingTreeOrSkip(made_control)
 
2909
        self.assertIsInstance(made_tree, workingtree.WorkingTree)
 
2910
        self.assertEqual(made_control, made_tree.bzrdir)
 
2911
 
 
2912
    def test_create_workingtree_revision(self):
 
2913
        # a bzrdir can construct a working tree for itself @ a specific revision.
 
2914
        t = self.get_transport()
 
2915
        source = self.make_branch_and_tree('source')
 
2916
        source.commit('a', rev_id='a', allow_pointless=True)
 
2917
        source.commit('b', rev_id='b', allow_pointless=True)
 
2918
        t.mkdir('new')
 
2919
        t_new = t.clone('new')
 
2920
        made_control = self.bzrdir_format.initialize_on_transport(t_new)
 
2921
        source.branch.repository.clone(made_control)
 
2922
        source.branch.clone(made_control)
 
2923
        try:
 
2924
            made_tree = made_control.create_workingtree(revision_id='a')
 
2925
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
2926
            raise TestSkipped("Can't make working tree on transport %r" % t)
 
2927
        self.assertEqual(['a'], made_tree.get_parent_ids())
 
2928
 
 
2929
    def test_open_workingtree(self):
 
2930
        if not self.bzrdir_format.is_supported():
 
2931
            # unsupported formats are not loopback testable
 
2932
            # because the default open will not open them and
 
2933
            # they may not be initializable.
 
2934
            return
 
2935
        # this has to be tested with local access as we still support creating
 
2936
        # format 6 bzrdirs
 
2937
        t = self.get_transport()
 
2938
        try:
 
2939
            made_control = self.bzrdir_format.initialize(t.base)
 
2940
            made_repo = made_control.create_repository()
 
2941
            made_branch = made_control.create_branch()
 
2942
            made_tree = made_control.create_workingtree()
 
2943
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
2944
            raise TestSkipped("Can't initialize %r on transport %r"
 
2945
                              % (self.bzrdir_format, t))
 
2946
        opened_tree = made_control.open_workingtree()
 
2947
        self.assertEqual(made_control, opened_tree.bzrdir)
 
2948
        self.assertIsInstance(opened_tree, made_tree.__class__)
 
2949
        self.assertIsInstance(opened_tree._format, made_tree._format.__class__)
 
2950
 
 
2951
    def test_get_selected_branch(self):
 
2952
        # The segment parameters are accessible from the root transport
 
2953
        # if a URL with segment parameters is opened.
 
2954
        if not self.bzrdir_format.is_supported():
 
2955
            # unsupported formats are not loopback testable
 
2956
            # because the default open will not open them and
 
2957
            # they may not be initializable.
 
2958
            return
 
2959
        t = self.get_transport()
 
2960
        try:
 
2961
            made_control = self.bzrdir_format.initialize(t.base)
 
2962
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
2963
            raise TestSkipped("Can't initialize %r on transport %r"
 
2964
                              % (self.bzrdir_format, t))
 
2965
        dir = bzrdir.BzrDir.open(t.base+",branch=foo")
 
2966
        self.assertEquals({"branch": "foo"},
 
2967
            dir.user_transport.get_segment_parameters())
 
2968
        self.assertEquals("foo", dir._get_selected_branch())
 
2969
 
 
2970
    def test_get_selected_branch_none_selected(self):
 
2971
        # _get_selected_branch defaults to None
 
2972
        if not self.bzrdir_format.is_supported():
 
2973
            # unsupported formats are not loopback testable
 
2974
            # because the default open will not open them and
 
2975
            # they may not be initializable.
 
2976
            return
 
2977
        t = self.get_transport()
 
2978
        try:
 
2979
            made_control = self.bzrdir_format.initialize(t.base)
 
2980
        except (errors.NotLocalUrl, errors.UnsupportedOperation):
 
2981
            raise TestSkipped("Can't initialize %r on transport %r"
 
2982
                              % (self.bzrdir_format, t))
 
2983
        dir = bzrdir.BzrDir.open(t.base)
 
2984
        self.assertIs(None, dir._get_selected_branch())
 
2985
 
 
2986
    def test_root_transport(self):
 
2987
        dir = self.make_bzrdir('.')
 
2988
        self.assertEqual(dir.root_transport.base,
 
2989
                         self.get_transport().base)
 
2990
 
 
2991
    def test_find_repository_no_repo_under_standalone_branch(self):
 
2992
        # finding a repo stops at standalone branches even if there is a
 
2993
        # higher repository available.
 
2994
        try:
 
2995
            repo = self.make_repository('.', shared=True)
 
2996
        except errors.IncompatibleFormat:
 
2997
            # need a shared repository to test this.
 
2998
            return
 
2999
        url = self.get_url('intermediate')
 
3000
        t = self.get_transport()
 
3001
        t.mkdir('intermediate')
 
3002
        t.mkdir('intermediate/child')
 
3003
        made_control = self.bzrdir_format.initialize(url)
 
3004
        made_control.create_repository()
 
3005
        innermost_control = self.bzrdir_format.initialize(
 
3006
            self.get_url('intermediate/child'))
 
3007
        try:
 
3008
            child_repo = innermost_control.open_repository()
 
3009
            # if there is a repository, then the format cannot ever hit this
 
3010
            # code path.
 
3011
            return
 
3012
        except errors.NoRepositoryPresent:
 
3013
            pass
 
3014
        self.assertRaises(errors.NoRepositoryPresent,
 
3015
                          innermost_control.find_repository)
 
3016
 
 
3017
    def test_find_repository_containing_shared_repository(self):
 
3018
        # find repo inside a shared repo with an empty control dir
 
3019
        # returns the shared repo.
 
3020
        try:
 
3021
            repo = self.make_repository('.', shared=True)
 
3022
        except errors.IncompatibleFormat:
 
3023
            # need a shared repository to test this.
 
3024
            return
 
3025
        url = self.get_url('childbzrdir')
 
3026
        self.get_transport().mkdir('childbzrdir')
 
3027
        made_control = self.bzrdir_format.initialize(url)
 
3028
        try:
 
3029
            child_repo = made_control.open_repository()
 
3030
            # if there is a repository, then the format cannot ever hit this
 
3031
            # code path.
 
3032
            return
 
3033
        except errors.NoRepositoryPresent:
 
3034
            pass
 
3035
        found_repo = made_control.find_repository()
 
3036
        self.assertEqual(repo.bzrdir.root_transport.base,
 
3037
                         found_repo.bzrdir.root_transport.base)
 
3038
 
 
3039
    def test_find_repository_standalone_with_containing_shared_repository(self):
 
3040
        # find repo inside a standalone repo inside a shared repo finds the standalone repo
 
3041
        try:
 
3042
            containing_repo = self.make_repository('.', shared=True)
 
3043
        except errors.IncompatibleFormat:
 
3044
            # need a shared repository to test this.
 
3045
            return
 
3046
        child_repo = self.make_repository('childrepo')
 
3047
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
 
3048
        found_repo = opened_control.find_repository()
 
3049
        self.assertEqual(child_repo.bzrdir.root_transport.base,
 
3050
                         found_repo.bzrdir.root_transport.base)
 
3051
 
 
3052
    def test_find_repository_shared_within_shared_repository(self):
 
3053
        # find repo at a shared repo inside a shared repo finds the inner repo
 
3054
        try:
 
3055
            containing_repo = self.make_repository('.', shared=True)
 
3056
        except errors.IncompatibleFormat:
 
3057
            # need a shared repository to test this.
 
3058
            return
 
3059
        url = self.get_url('childrepo')
 
3060
        self.get_transport().mkdir('childrepo')
 
3061
        child_control = self.bzrdir_format.initialize(url)
 
3062
        child_repo = child_control.create_repository(shared=True)
 
3063
        opened_control = bzrdir.BzrDir.open(self.get_url('childrepo'))
 
3064
        found_repo = opened_control.find_repository()
 
3065
        self.assertEqual(child_repo.bzrdir.root_transport.base,
 
3066
                         found_repo.bzrdir.root_transport.base)
 
3067
        self.assertNotEqual(child_repo.bzrdir.root_transport.base,
 
3068
                            containing_repo.bzrdir.root_transport.base)
 
3069
 
 
3070
    def test_find_repository_with_nested_dirs_works(self):
 
3071
        # find repo inside a bzrdir inside a bzrdir inside a shared repo
 
3072
        # finds the outer shared repo.
 
3073
        try:
 
3074
            repo = self.make_repository('.', shared=True)
 
3075
        except errors.IncompatibleFormat:
 
3076
            # need a shared repository to test this.
 
3077
            return
 
3078
        url = self.get_url('intermediate')
 
3079
        t = self.get_transport()
 
3080
        t.mkdir('intermediate')
 
3081
        t.mkdir('intermediate/child')
 
3082
        made_control = self.bzrdir_format.initialize(url)
 
3083
        try:
 
3084
            child_repo = made_control.open_repository()
 
3085
            # if there is a repository, then the format cannot ever hit this
 
3086
            # code path.
 
3087
            return
 
3088
        except errors.NoRepositoryPresent:
 
3089
            pass
 
3090
        innermost_control = self.bzrdir_format.initialize(
 
3091
            self.get_url('intermediate/child'))
 
3092
        try:
 
3093
            child_repo = innermost_control.open_repository()
 
3094
            # if there is a repository, then the format cannot ever hit this
 
3095
            # code path.
 
3096
            return
 
3097
        except errors.NoRepositoryPresent:
 
3098
            pass
 
3099
        found_repo = innermost_control.find_repository()
 
3100
        self.assertEqual(repo.bzrdir.root_transport.base,
 
3101
                         found_repo.bzrdir.root_transport.base)
 
3102
 
 
3103
    def test_can_and_needs_format_conversion(self):
 
3104
        # check that we can ask an instance if its upgradable
 
3105
        dir = self.make_bzrdir('.')
 
3106
        if dir.can_convert_format():
 
3107
            # if its default updatable there must be an updater
 
3108
            # (we force the latest known format as downgrades may not be
 
3109
            # available
 
3110
            self.assertTrue(isinstance(dir._format.get_converter(
 
3111
                format=dir._format), controldir.Converter))
 
3112
        dir.needs_format_conversion(
 
3113
            controldir.ControlDirFormat.get_default_format())
 
3114
 
 
3115
    def test_backup_copies_existing(self):
 
3116
        tree = self.make_branch_and_tree('test')
 
3117
        self.build_tree(['test/a'])
 
3118
        tree.add(['a'], ['a-id'])
 
3119
        tree.commit('some data to be copied.')
 
3120
        old_url, new_url = tree.bzrdir.backup_bzrdir()
 
3121
        old_path = urlutils.local_path_from_url(old_url)
 
3122
        new_path = urlutils.local_path_from_url(new_url)
 
3123
        self.assertPathExists(old_path)
 
3124
        self.assertPathExists(new_path)
 
3125
        for (((dir_relpath1, _), entries1),
 
3126
             ((dir_relpath2, _), entries2)) in izip(
 
3127
                osutils.walkdirs(old_path),
 
3128
                osutils.walkdirs(new_path)):
 
3129
            self.assertEquals(dir_relpath1, dir_relpath2)
 
3130
            for f1, f2 in zip(entries1, entries2):
 
3131
                self.assertEquals(f1[0], f2[0])
 
3132
                self.assertEquals(f1[2], f2[2])
 
3133
                if f1[2] == "file":
 
3134
                    osutils.compare_files(open(f1[4]), open(f2[4]))
 
3135
 
 
3136
    def test_upgrade_new_instance(self):
 
3137
        """Does an available updater work?"""
 
3138
        dir = self.make_bzrdir('.')
 
3139
        # for now, upgrade is not ready for partial bzrdirs.
 
3140
        dir.create_repository()
 
3141
        dir.create_branch()
 
3142
        self.createWorkingTreeOrSkip(dir)
 
3143
        if dir.can_convert_format():
 
3144
            # if its default updatable there must be an updater
 
3145
            # (we force the latest known format as downgrades may not be
 
3146
            # available
 
3147
            pb = ui.ui_factory.nested_progress_bar()
 
3148
            try:
 
3149
                dir._format.get_converter(format=dir._format).convert(dir, pb)
 
3150
            finally:
 
3151
                pb.finished()
 
3152
            # and it should pass 'check' now.
 
3153
            check.check_dwim(self.get_url('.'), False, True, True)
 
3154
 
 
3155
    def test_format_description(self):
 
3156
        dir = self.make_bzrdir('.')
 
3157
        text = dir._format.get_format_description()
 
3158
        self.assertTrue(len(text))
 
3159
 
 
3160
 
 
3161
class TestBreakLock(TestCaseWithControlDir):
 
3162
 
 
3163
    def test_break_lock_empty(self):
 
3164
        # break lock on an empty bzrdir should work silently.
 
3165
        dir = self.make_bzrdir('.')
 
3166
        try:
 
3167
            dir.break_lock()
 
3168
        except NotImplementedError:
 
3169
            pass
 
3170
 
 
3171
    def test_break_lock_repository(self):
 
3172
        # break lock with just a repo should unlock the repo.
 
3173
        repo = self.make_repository('.')
 
3174
        repo.lock_write()
 
3175
        lock_repo = repo.bzrdir.open_repository()
 
3176
        if not lock_repo.get_physical_lock_status():
 
3177
            # This bzrdir's default repository does not physically lock things
 
3178
            # and thus this interaction cannot be tested at the interface
 
3179
            # level.
 
3180
            repo.unlock()
 
3181
            return
 
3182
        # only one yes needed here: it should only be unlocking
 
3183
        # the repo
 
3184
        bzrlib.ui.ui_factory = CannedInputUIFactory([True])
 
3185
        try:
 
3186
            repo.bzrdir.break_lock()
 
3187
        except NotImplementedError:
 
3188
            # this bzrdir does not implement break_lock - so we cant test it.
 
3189
            repo.unlock()
 
3190
            return
 
3191
        lock_repo.lock_write()
 
3192
        lock_repo.unlock()
 
3193
        self.assertRaises(errors.LockBroken, repo.unlock)
 
3194
 
 
3195
    def test_break_lock_branch(self):
 
3196
        # break lock with just a repo should unlock the branch.
 
3197
        # and not directly try the repository.
 
3198
        # we test this by making a branch reference to a branch
 
3199
        # and repository in another bzrdir
 
3200
        # for pre-metadir formats this will fail, thats ok.
 
3201
        master = self.make_branch('branch')
 
3202
        thisdir = self.make_bzrdir('this')
 
3203
        try:
 
3204
            bzrlib.branch.BranchReferenceFormat().initialize(
 
3205
                thisdir, target_branch=master)
 
3206
        except errors.IncompatibleFormat:
 
3207
            return
 
3208
        unused_repo = thisdir.create_repository()
 
3209
        master.lock_write()
 
3210
        unused_repo.lock_write()
 
3211
        try:
 
3212
            # two yes's : branch and repository. If the repo in this
 
3213
            # dir is inappropriately accessed, 3 will be needed, and
 
3214
            # we'll see that because the stream will be fully consumed
 
3215
            bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
 
3216
            # determine if the repository will have been locked;
 
3217
            this_repo_locked = \
 
3218
                thisdir.open_repository().get_physical_lock_status()
 
3219
            master.bzrdir.break_lock()
 
3220
            if this_repo_locked:
 
3221
                # only two ys should have been read
 
3222
                self.assertEqual([True],
 
3223
                    bzrlib.ui.ui_factory.responses)
 
3224
            else:
 
3225
                # only one y should have been read
 
3226
                self.assertEqual([True, True],
 
3227
                    bzrlib.ui.ui_factory.responses)
 
3228
            # we should be able to lock a newly opened branch now
 
3229
            branch = master.bzrdir.open_branch()
 
3230
            branch.lock_write()
 
3231
            branch.unlock()
 
3232
            if this_repo_locked:
 
3233
                # we should not be able to lock the repository in thisdir as
 
3234
                # its still held by the explicit lock we took, and the break
 
3235
                # lock should not have touched it.
 
3236
                repo = thisdir.open_repository()
 
3237
                self.assertRaises(errors.LockContention, repo.lock_write)
 
3238
        finally:
 
3239
            unused_repo.unlock()
 
3240
        self.assertRaises(errors.LockBroken, master.unlock)
 
3241
 
 
3242
    def test_break_lock_tree(self):
 
3243
        # break lock with a tree should unlock the tree but not try the
 
3244
        # branch explicitly. However this is very hard to test for as we
 
3245
        # dont have a tree reference class, nor is one needed;
 
3246
        # the worst case if this code unlocks twice is an extra question
 
3247
        # being asked.
 
3248
        tree = self.make_branch_and_tree('.')
 
3249
        tree.lock_write()
 
3250
        # three yes's : tree, branch and repository.
 
3251
        bzrlib.ui.ui_factory = CannedInputUIFactory([True, True, True])
 
3252
        try:
 
3253
            tree.bzrdir.break_lock()
 
3254
        except (NotImplementedError, errors.LockActive):
 
3255
            # bzrdir does not support break_lock
 
3256
            # or one of the locked objects (currently only tree does this)
 
3257
            # raised a LockActive because we do still have a live locked
 
3258
            # object.
 
3259
            tree.unlock()
 
3260
            return
 
3261
        self.assertEqual([True],
 
3262
                bzrlib.ui.ui_factory.responses)
 
3263
        lock_tree = tree.bzrdir.open_workingtree()
 
3264
        lock_tree.lock_write()
 
3265
        lock_tree.unlock()
 
3266
        self.assertRaises(errors.LockBroken, tree.unlock)
 
3267
 
 
3268
 
 
3269
class TestTransportConfig(TestCaseWithControlDir):
 
3270
 
 
3271
    def test_get_config(self):
 
3272
        my_dir = self.make_bzrdir('.')
 
3273
        config = my_dir.get_config()
 
3274
        try:
 
3275
            config.set_default_stack_on('http://example.com')
 
3276
        except errors.BzrError, e:
 
3277
            if 'Cannot set config' in str(e):
 
3278
                self.assertFalse(
 
3279
                    isinstance(my_dir, (bzrdir.BzrDirMeta1, RemoteBzrDir)),
 
3280
                    "%r should support configs" % my_dir)
 
3281
                raise TestNotApplicable(
 
3282
                    'This BzrDir format does not support configs.')
 
3283
            else:
 
3284
                raise
 
3285
        self.assertEqual('http://example.com', config.get_default_stack_on())
 
3286
        my_dir2 = bzrdir.BzrDir.open(self.get_url('.'))
 
3287
        config2 = my_dir2.get_config()
 
3288
        self.assertEqual('http://example.com', config2.get_default_stack_on())
 
3289
 
 
3290
 
 
3291
class ChrootedControlDirTests(ChrootedTestCase):
 
3292
 
 
3293
    def test_find_repository_no_repository(self):
 
3294
        # loopback test to check the current format fails to find a
 
3295
        # share repository correctly.
 
3296
        if not self.bzrdir_format.is_supported():
 
3297
            # unsupported formats are not loopback testable
 
3298
            # because the default open will not open them and
 
3299
            # they may not be initializable.
 
3300
            return
 
3301
        # supported formats must be able to init and open
 
3302
        # - do the vfs initialisation over the basic vfs transport
 
3303
        # XXX: TODO this should become a 'bzrdirlocation' api call.
 
3304
        url = self.get_vfs_only_url('subdir')
 
3305
        transport.get_transport_from_url(self.get_vfs_only_url()).mkdir(
 
3306
            'subdir')
1641
3307
        made_control = self.bzrdir_format.initialize(self.get_url('subdir'))
1642
3308
        try:
1643
3309
            repo = made_control.open_repository()