~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Martin Pool
  • Date: 2010-02-03 00:08:23 UTC
  • mto: This revision was merged to the branch mainline in revision 5002.
  • Revision ID: mbp@sourcefrog.net-20100203000823-fcyf2791xrl3fbfo
expand tabs

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/per_bzr_dir.
 
20
"""
 
21
 
 
22
import os
 
23
import subprocess
 
24
import sys
 
25
 
 
26
from bzrlib import (
 
27
    bzrdir,
 
28
    errors,
 
29
    help_topics,
 
30
    repository,
 
31
    osutils,
 
32
    remote,
 
33
    urlutils,
 
34
    win32utils,
 
35
    workingtree,
 
36
    )
 
37
import bzrlib.branch
 
38
from bzrlib.errors import (NotBranchError,
 
39
                           UnknownFormatError,
 
40
                           UnsupportedFormatError,
 
41
                           )
 
42
from bzrlib.tests import (
 
43
    TestCase,
 
44
    TestCaseWithMemoryTransport,
 
45
    TestCaseWithTransport,
 
46
    TestSkipped,
 
47
    )
 
48
from bzrlib.tests import(
 
49
    http_server,
 
50
    http_utils,
 
51
    )
 
52
from bzrlib.tests.test_http import TestWithTransport_pycurl
 
53
from bzrlib.transport import get_transport
 
54
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
55
from bzrlib.transport.memory import MemoryServer
 
56
from bzrlib.transport.nosmart import NoSmartTransportDecorator
 
57
from bzrlib.transport.readonly import ReadonlyTransportDecorator
 
58
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
 
59
 
 
60
 
 
61
class TestDefaultFormat(TestCase):
 
62
 
 
63
    def test_get_set_default_format(self):
 
64
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
65
        # default is BzrDirFormat6
 
66
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
 
67
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
68
        # creating a bzr dir should now create an instrumented dir.
 
69
        try:
 
70
            result = bzrdir.BzrDir.create('memory:///')
 
71
            self.failUnless(isinstance(result, SampleBzrDir))
 
72
        finally:
 
73
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
74
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
75
 
 
76
 
 
77
class TestFormatRegistry(TestCase):
 
78
 
 
79
    def make_format_registry(self):
 
80
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
81
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
 
82
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
83
            ' repositories', deprecated=True)
 
84
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
 
85
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
 
86
        my_format_registry.register_metadir('knit',
 
87
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
 
88
            'Format using knits',
 
89
            )
 
90
        my_format_registry.set_default('knit')
 
91
        my_format_registry.register_metadir(
 
92
            'branch6',
 
93
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
94
            'Experimental successor to knit.  Use at your own risk.',
 
95
            branch_format='bzrlib.branch.BzrBranchFormat6',
 
96
            experimental=True)
 
97
        my_format_registry.register_metadir(
 
98
            'hidden format',
 
99
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
100
            'Experimental successor to knit.  Use at your own risk.',
 
101
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
 
102
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
 
103
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
104
            ' repositories', hidden=True)
 
105
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
 
106
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
 
107
            hidden=True)
 
108
        return my_format_registry
 
109
 
 
110
    def test_format_registry(self):
 
111
        my_format_registry = self.make_format_registry()
 
112
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
 
113
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
114
        my_bzrdir = my_format_registry.make_bzrdir('weave')
 
115
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
116
        my_bzrdir = my_format_registry.make_bzrdir('default')
 
117
        self.assertIsInstance(my_bzrdir.repository_format,
 
118
            knitrepo.RepositoryFormatKnit1)
 
119
        my_bzrdir = my_format_registry.make_bzrdir('knit')
 
120
        self.assertIsInstance(my_bzrdir.repository_format,
 
121
            knitrepo.RepositoryFormatKnit1)
 
122
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
 
123
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
124
                              bzrlib.branch.BzrBranchFormat6)
 
125
 
 
126
    def test_get_help(self):
 
127
        my_format_registry = self.make_format_registry()
 
128
        self.assertEqual('Format registered lazily',
 
129
                         my_format_registry.get_help('lazy'))
 
130
        self.assertEqual('Format using knits',
 
131
                         my_format_registry.get_help('knit'))
 
132
        self.assertEqual('Format using knits',
 
133
                         my_format_registry.get_help('default'))
 
134
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
 
135
                         ' checkouts or shared repositories',
 
136
                         my_format_registry.get_help('weave'))
 
137
 
 
138
    def test_help_topic(self):
 
139
        topics = help_topics.HelpTopicRegistry()
 
140
        registry = self.make_format_registry()
 
141
        topics.register('current-formats', registry.help_topic,
 
142
                        'Current formats')
 
143
        topics.register('other-formats', registry.help_topic,
 
144
                        'Other formats')
 
145
        new = topics.get_detail('current-formats')
 
146
        rest = topics.get_detail('other-formats')
 
147
        experimental, deprecated = rest.split('Deprecated formats')
 
148
        self.assertContainsRe(new, 'formats-help')
 
149
        self.assertContainsRe(new,
 
150
                ':knit:\n    \(native\) \(default\) Format using knits\n')
 
151
        self.assertContainsRe(experimental,
 
152
                ':branch6:\n    \(native\) Experimental successor to knit')
 
153
        self.assertContainsRe(deprecated,
 
154
                ':lazy:\n    \(native\) Format registered lazily\n')
 
155
        self.assertNotContainsRe(new, 'hidden')
 
156
 
 
157
    def test_set_default_repository(self):
 
158
        default_factory = bzrdir.format_registry.get('default')
 
159
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
 
160
                       if v == default_factory and k != 'default'][0]
 
161
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
 
162
        try:
 
163
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
 
164
                          bzrdir.format_registry.get('default'))
 
165
            self.assertIs(
 
166
                repository.RepositoryFormat.get_default_format().__class__,
 
167
                knitrepo.RepositoryFormatKnit3)
 
168
        finally:
 
169
            bzrdir.format_registry.set_default_repository(old_default)
 
170
 
 
171
    def test_aliases(self):
 
172
        a_registry = bzrdir.BzrDirFormatRegistry()
 
173
        a_registry.register('weave', bzrdir.BzrDirFormat6,
 
174
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
175
            ' repositories', deprecated=True)
 
176
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
 
177
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
178
            ' repositories', deprecated=True, alias=True)
 
179
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
 
180
 
 
181
 
 
182
class SampleBranch(bzrlib.branch.Branch):
 
183
    """A dummy branch for guess what, dummy use."""
 
184
 
 
185
    def __init__(self, dir):
 
186
        self.bzrdir = dir
 
187
 
 
188
 
 
189
class SampleRepository(bzrlib.repository.Repository):
 
190
    """A dummy repo."""
 
191
 
 
192
    def __init__(self, dir):
 
193
        self.bzrdir = dir
 
194
 
 
195
 
 
196
class SampleBzrDir(bzrdir.BzrDir):
 
197
    """A sample BzrDir implementation to allow testing static methods."""
 
198
 
 
199
    def create_repository(self, shared=False):
 
200
        """See BzrDir.create_repository."""
 
201
        return "A repository"
 
202
 
 
203
    def open_repository(self):
 
204
        """See BzrDir.open_repository."""
 
205
        return SampleRepository(self)
 
206
 
 
207
    def create_branch(self):
 
208
        """See BzrDir.create_branch."""
 
209
        return SampleBranch(self)
 
210
 
 
211
    def create_workingtree(self):
 
212
        """See BzrDir.create_workingtree."""
 
213
        return "A tree"
 
214
 
 
215
 
 
216
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
217
    """A sample format
 
218
 
 
219
    this format is initializable, unsupported to aid in testing the
 
220
    open and open_downlevel routines.
 
221
    """
 
222
 
 
223
    def get_format_string(self):
 
224
        """See BzrDirFormat.get_format_string()."""
 
225
        return "Sample .bzr dir format."
 
226
 
 
227
    def initialize_on_transport(self, t):
 
228
        """Create a bzr dir."""
 
229
        t.mkdir('.bzr')
 
230
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
231
        return SampleBzrDir(t, self)
 
232
 
 
233
    def is_supported(self):
 
234
        return False
 
235
 
 
236
    def open(self, transport, _found=None):
 
237
        return "opened branch."
 
238
 
 
239
 
 
240
class TestBzrDirFormat(TestCaseWithTransport):
 
241
    """Tests for the BzrDirFormat facility."""
 
242
 
 
243
    def test_find_format(self):
 
244
        # is the right format object found for a branch?
 
245
        # create a branch with a few known format objects.
 
246
        # this is not quite the same as
 
247
        t = get_transport(self.get_url())
 
248
        self.build_tree(["foo/", "bar/"], transport=t)
 
249
        def check_format(format, url):
 
250
            format.initialize(url)
 
251
            t = get_transport(url)
 
252
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
253
            self.failUnless(isinstance(found_format, format.__class__))
 
254
        check_format(bzrdir.BzrDirFormat5(), "foo")
 
255
        check_format(bzrdir.BzrDirFormat6(), "bar")
 
256
 
 
257
    def test_find_format_nothing_there(self):
 
258
        self.assertRaises(NotBranchError,
 
259
                          bzrdir.BzrDirFormat.find_format,
 
260
                          get_transport('.'))
 
261
 
 
262
    def test_find_format_unknown_format(self):
 
263
        t = get_transport(self.get_url())
 
264
        t.mkdir('.bzr')
 
265
        t.put_bytes('.bzr/branch-format', '')
 
266
        self.assertRaises(UnknownFormatError,
 
267
                          bzrdir.BzrDirFormat.find_format,
 
268
                          get_transport('.'))
 
269
 
 
270
    def test_register_unregister_format(self):
 
271
        format = SampleBzrDirFormat()
 
272
        url = self.get_url()
 
273
        # make a bzrdir
 
274
        format.initialize(url)
 
275
        # register a format for it.
 
276
        bzrdir.BzrDirFormat.register_format(format)
 
277
        # which bzrdir.Open will refuse (not supported)
 
278
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
279
        # which bzrdir.open_containing will refuse (not supported)
 
280
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
 
281
        # but open_downlevel will work
 
282
        t = get_transport(url)
 
283
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
284
        # unregister the format
 
285
        bzrdir.BzrDirFormat.unregister_format(format)
 
286
        # now open_downlevel should fail too.
 
287
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
 
288
 
 
289
    def test_create_branch_and_repo_uses_default(self):
 
290
        format = SampleBzrDirFormat()
 
291
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
292
                                                      format=format)
 
293
        self.assertTrue(isinstance(branch, SampleBranch))
 
294
 
 
295
    def test_create_branch_and_repo_under_shared(self):
 
296
        # creating a branch and repo in a shared repo uses the
 
297
        # shared repository
 
298
        format = bzrdir.format_registry.make_bzrdir('knit')
 
299
        self.make_repository('.', shared=True, format=format)
 
300
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
301
            self.get_url('child'), format=format)
 
302
        self.assertRaises(errors.NoRepositoryPresent,
 
303
                          branch.bzrdir.open_repository)
 
304
 
 
305
    def test_create_branch_and_repo_under_shared_force_new(self):
 
306
        # creating a branch and repo in a shared repo can be forced to
 
307
        # make a new repo
 
308
        format = bzrdir.format_registry.make_bzrdir('knit')
 
309
        self.make_repository('.', shared=True, format=format)
 
310
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
311
                                                      force_new_repo=True,
 
312
                                                      format=format)
 
313
        branch.bzrdir.open_repository()
 
314
 
 
315
    def test_create_standalone_working_tree(self):
 
316
        format = SampleBzrDirFormat()
 
317
        # note this is deliberately readonly, as this failure should
 
318
        # occur before any writes.
 
319
        self.assertRaises(errors.NotLocalUrl,
 
320
                          bzrdir.BzrDir.create_standalone_workingtree,
 
321
                          self.get_readonly_url(), format=format)
 
322
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
 
323
                                                           format=format)
 
324
        self.assertEqual('A tree', tree)
 
325
 
 
326
    def test_create_standalone_working_tree_under_shared_repo(self):
 
327
        # create standalone working tree always makes a repo.
 
328
        format = bzrdir.format_registry.make_bzrdir('knit')
 
329
        self.make_repository('.', shared=True, format=format)
 
330
        # note this is deliberately readonly, as this failure should
 
331
        # occur before any writes.
 
332
        self.assertRaises(errors.NotLocalUrl,
 
333
                          bzrdir.BzrDir.create_standalone_workingtree,
 
334
                          self.get_readonly_url('child'), format=format)
 
335
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
 
336
            format=format)
 
337
        tree.bzrdir.open_repository()
 
338
 
 
339
    def test_create_branch_convenience(self):
 
340
        # outside a repo the default convenience output is a repo+branch_tree
 
341
        format = bzrdir.format_registry.make_bzrdir('knit')
 
342
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
343
        branch.bzrdir.open_workingtree()
 
344
        branch.bzrdir.open_repository()
 
345
 
 
346
    def test_create_branch_convenience_possible_transports(self):
 
347
        """Check that the optional 'possible_transports' is recognized"""
 
348
        format = bzrdir.format_registry.make_bzrdir('knit')
 
349
        t = self.get_transport()
 
350
        branch = bzrdir.BzrDir.create_branch_convenience(
 
351
            '.', format=format, possible_transports=[t])
 
352
        branch.bzrdir.open_workingtree()
 
353
        branch.bzrdir.open_repository()
 
354
 
 
355
    def test_create_branch_convenience_root(self):
 
356
        """Creating a branch at the root of a fs should work."""
 
357
        self.vfs_transport_factory = MemoryServer
 
358
        # outside a repo the default convenience output is a repo+branch_tree
 
359
        format = bzrdir.format_registry.make_bzrdir('knit')
 
360
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
 
361
                                                         format=format)
 
362
        self.assertRaises(errors.NoWorkingTree,
 
363
                          branch.bzrdir.open_workingtree)
 
364
        branch.bzrdir.open_repository()
 
365
 
 
366
    def test_create_branch_convenience_under_shared_repo(self):
 
367
        # inside a repo the default convenience output is a branch+ follow the
 
368
        # repo tree policy
 
369
        format = bzrdir.format_registry.make_bzrdir('knit')
 
370
        self.make_repository('.', shared=True, format=format)
 
371
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
372
            format=format)
 
373
        branch.bzrdir.open_workingtree()
 
374
        self.assertRaises(errors.NoRepositoryPresent,
 
375
                          branch.bzrdir.open_repository)
 
376
 
 
377
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
378
        # inside a repo the default convenience output is a branch+ follow the
 
379
        # repo tree policy but we can override that
 
380
        format = bzrdir.format_registry.make_bzrdir('knit')
 
381
        self.make_repository('.', shared=True, format=format)
 
382
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
383
            force_new_tree=False, format=format)
 
384
        self.assertRaises(errors.NoWorkingTree,
 
385
                          branch.bzrdir.open_workingtree)
 
386
        self.assertRaises(errors.NoRepositoryPresent,
 
387
                          branch.bzrdir.open_repository)
 
388
 
 
389
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
390
        # inside a repo the default convenience output is a branch+ follow the
 
391
        # repo tree policy
 
392
        format = bzrdir.format_registry.make_bzrdir('knit')
 
393
        repo = self.make_repository('.', shared=True, format=format)
 
394
        repo.set_make_working_trees(False)
 
395
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
396
                                                         format=format)
 
397
        self.assertRaises(errors.NoWorkingTree,
 
398
                          branch.bzrdir.open_workingtree)
 
399
        self.assertRaises(errors.NoRepositoryPresent,
 
400
                          branch.bzrdir.open_repository)
 
401
 
 
402
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
403
        # inside a repo the default convenience output is a branch+ follow the
 
404
        # repo tree policy but we can override that
 
405
        format = bzrdir.format_registry.make_bzrdir('knit')
 
406
        repo = self.make_repository('.', shared=True, format=format)
 
407
        repo.set_make_working_trees(False)
 
408
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
409
            force_new_tree=True, format=format)
 
410
        branch.bzrdir.open_workingtree()
 
411
        self.assertRaises(errors.NoRepositoryPresent,
 
412
                          branch.bzrdir.open_repository)
 
413
 
 
414
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
415
        # inside a repo the default convenience output is overridable to give
 
416
        # repo+branch+tree
 
417
        format = bzrdir.format_registry.make_bzrdir('knit')
 
418
        self.make_repository('.', shared=True, format=format)
 
419
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
420
            force_new_repo=True, format=format)
 
421
        branch.bzrdir.open_repository()
 
422
        branch.bzrdir.open_workingtree()
 
423
 
 
424
 
 
425
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
 
426
 
 
427
    def test_acquire_repository_standalone(self):
 
428
        """The default acquisition policy should create a standalone branch."""
 
429
        my_bzrdir = self.make_bzrdir('.')
 
430
        repo_policy = my_bzrdir.determine_repository_policy()
 
431
        repo, is_new = repo_policy.acquire_repository()
 
432
        self.assertEqual(repo.bzrdir.root_transport.base,
 
433
                         my_bzrdir.root_transport.base)
 
434
        self.assertFalse(repo.is_shared())
 
435
 
 
436
    def test_determine_stacking_policy(self):
 
437
        parent_bzrdir = self.make_bzrdir('.')
 
438
        child_bzrdir = self.make_bzrdir('child')
 
439
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
 
440
        repo_policy = child_bzrdir.determine_repository_policy()
 
441
        self.assertEqual('http://example.org', repo_policy._stack_on)
 
442
 
 
443
    def test_determine_stacking_policy_relative(self):
 
444
        parent_bzrdir = self.make_bzrdir('.')
 
445
        child_bzrdir = self.make_bzrdir('child')
 
446
        parent_bzrdir.get_config().set_default_stack_on('child2')
 
447
        repo_policy = child_bzrdir.determine_repository_policy()
 
448
        self.assertEqual('child2', repo_policy._stack_on)
 
449
        self.assertEqual(parent_bzrdir.root_transport.base,
 
450
                         repo_policy._stack_on_pwd)
 
451
 
 
452
    def prepare_default_stacking(self, child_format='1.6'):
 
453
        parent_bzrdir = self.make_bzrdir('.')
 
454
        child_branch = self.make_branch('child', format=child_format)
 
455
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
 
456
        new_child_transport = parent_bzrdir.transport.clone('child2')
 
457
        return child_branch, new_child_transport
 
458
 
 
459
    def test_clone_on_transport_obeys_stacking_policy(self):
 
460
        child_branch, new_child_transport = self.prepare_default_stacking()
 
461
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
462
        self.assertEqual(child_branch.base,
 
463
                         new_child.open_branch().get_stacked_on_url())
 
464
 
 
465
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
 
466
        # Make stackable source branch with an unstackable repo format.
 
467
        source_bzrdir = self.make_bzrdir('source')
 
468
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
 
469
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
 
470
        # Make a directory with a default stacking policy
 
471
        parent_bzrdir = self.make_bzrdir('parent')
 
472
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
 
473
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
 
474
        # Clone source into directory
 
475
        target = source_bzrdir.clone(self.get_url('parent/target'))
 
476
 
 
477
    def test_sprout_obeys_stacking_policy(self):
 
478
        child_branch, new_child_transport = self.prepare_default_stacking()
 
479
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
480
        self.assertEqual(child_branch.base,
 
481
                         new_child.open_branch().get_stacked_on_url())
 
482
 
 
483
    def test_clone_ignores_policy_for_unsupported_formats(self):
 
484
        child_branch, new_child_transport = self.prepare_default_stacking(
 
485
            child_format='pack-0.92')
 
486
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
487
        self.assertRaises(errors.UnstackableBranchFormat,
 
488
                          new_child.open_branch().get_stacked_on_url)
 
489
 
 
490
    def test_sprout_ignores_policy_for_unsupported_formats(self):
 
491
        child_branch, new_child_transport = self.prepare_default_stacking(
 
492
            child_format='pack-0.92')
 
493
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
494
        self.assertRaises(errors.UnstackableBranchFormat,
 
495
                          new_child.open_branch().get_stacked_on_url)
 
496
 
 
497
    def test_sprout_upgrades_format_if_stacked_specified(self):
 
498
        child_branch, new_child_transport = self.prepare_default_stacking(
 
499
            child_format='pack-0.92')
 
500
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
501
                                               stacked=True)
 
502
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
503
                         new_child.open_branch().get_stacked_on_url())
 
504
        repo = new_child.open_repository()
 
505
        self.assertTrue(repo._format.supports_external_lookups)
 
506
        self.assertFalse(repo.supports_rich_root())
 
507
 
 
508
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
 
509
        child_branch, new_child_transport = self.prepare_default_stacking(
 
510
            child_format='pack-0.92')
 
511
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
 
512
            stacked_on=child_branch.bzrdir.root_transport.base)
 
513
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
514
                         new_child.open_branch().get_stacked_on_url())
 
515
        repo = new_child.open_repository()
 
516
        self.assertTrue(repo._format.supports_external_lookups)
 
517
        self.assertFalse(repo.supports_rich_root())
 
518
 
 
519
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
 
520
        child_branch, new_child_transport = self.prepare_default_stacking(
 
521
            child_format='rich-root-pack')
 
522
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
523
                                               stacked=True)
 
524
        repo = new_child.open_repository()
 
525
        self.assertTrue(repo._format.supports_external_lookups)
 
526
        self.assertTrue(repo.supports_rich_root())
 
527
 
 
528
    def test_add_fallback_repo_handles_absolute_urls(self):
 
529
        stack_on = self.make_branch('stack_on', format='1.6')
 
530
        repo = self.make_repository('repo', format='1.6')
 
531
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
 
532
        policy._add_fallback(repo)
 
533
 
 
534
    def test_add_fallback_repo_handles_relative_urls(self):
 
535
        stack_on = self.make_branch('stack_on', format='1.6')
 
536
        repo = self.make_repository('repo', format='1.6')
 
537
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
 
538
        policy._add_fallback(repo)
 
539
 
 
540
    def test_configure_relative_branch_stacking_url(self):
 
541
        stack_on = self.make_branch('stack_on', format='1.6')
 
542
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
543
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
544
            '.', stack_on.base)
 
545
        policy.configure_branch(stacked)
 
546
        self.assertEqual('..', stacked.get_stacked_on_url())
 
547
 
 
548
    def test_relative_branch_stacking_to_absolute(self):
 
549
        stack_on = self.make_branch('stack_on', format='1.6')
 
550
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
551
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
552
            '.', self.get_readonly_url('stack_on'))
 
553
        policy.configure_branch(stacked)
 
554
        self.assertEqual(self.get_readonly_url('stack_on'),
 
555
                         stacked.get_stacked_on_url())
 
556
 
 
557
 
 
558
class ChrootedTests(TestCaseWithTransport):
 
559
    """A support class that provides readonly urls outside the local namespace.
 
560
 
 
561
    This is done by checking if self.transport_server is a MemoryServer. if it
 
562
    is then we are chrooted already, if it is not then an HttpServer is used
 
563
    for readonly urls.
 
564
    """
 
565
 
 
566
    def setUp(self):
 
567
        super(ChrootedTests, self).setUp()
 
568
        if not self.vfs_transport_factory == MemoryServer:
 
569
            self.transport_readonly_server = http_server.HttpServer
 
570
 
 
571
    def local_branch_path(self, branch):
 
572
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
573
 
 
574
    def test_open_containing(self):
 
575
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
576
                          self.get_readonly_url(''))
 
577
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
578
                          self.get_readonly_url('g/p/q'))
 
579
        control = bzrdir.BzrDir.create(self.get_url())
 
580
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
 
581
        self.assertEqual('', relpath)
 
582
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
 
583
        self.assertEqual('g/p/q', relpath)
 
584
 
 
585
    def test_open_containing_tree_branch_or_repository_empty(self):
 
586
        self.assertRaises(errors.NotBranchError,
 
587
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
 
588
            self.get_readonly_url(''))
 
589
 
 
590
    def test_open_containing_tree_branch_or_repository_all(self):
 
591
        self.make_branch_and_tree('topdir')
 
592
        tree, branch, repo, relpath = \
 
593
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
594
                'topdir/foo')
 
595
        self.assertEqual(os.path.realpath('topdir'),
 
596
                         os.path.realpath(tree.basedir))
 
597
        self.assertEqual(os.path.realpath('topdir'),
 
598
                         self.local_branch_path(branch))
 
599
        self.assertEqual(
 
600
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
 
601
            repo.bzrdir.transport.local_abspath('repository'))
 
602
        self.assertEqual(relpath, 'foo')
 
603
 
 
604
    def test_open_containing_tree_branch_or_repository_no_tree(self):
 
605
        self.make_branch('branch')
 
606
        tree, branch, repo, relpath = \
 
607
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
608
                'branch/foo')
 
609
        self.assertEqual(tree, None)
 
610
        self.assertEqual(os.path.realpath('branch'),
 
611
                         self.local_branch_path(branch))
 
612
        self.assertEqual(
 
613
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
 
614
            repo.bzrdir.transport.local_abspath('repository'))
 
615
        self.assertEqual(relpath, 'foo')
 
616
 
 
617
    def test_open_containing_tree_branch_or_repository_repo(self):
 
618
        self.make_repository('repo')
 
619
        tree, branch, repo, relpath = \
 
620
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
621
                'repo')
 
622
        self.assertEqual(tree, None)
 
623
        self.assertEqual(branch, None)
 
624
        self.assertEqual(
 
625
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
 
626
            repo.bzrdir.transport.local_abspath('repository'))
 
627
        self.assertEqual(relpath, '')
 
628
 
 
629
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
 
630
        self.make_repository('shared', shared=True)
 
631
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
 
632
                                                force_new_tree=False)
 
633
        tree, branch, repo, relpath = \
 
634
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
635
                'shared/branch')
 
636
        self.assertEqual(tree, None)
 
637
        self.assertEqual(os.path.realpath('shared/branch'),
 
638
                         self.local_branch_path(branch))
 
639
        self.assertEqual(
 
640
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
 
641
            repo.bzrdir.transport.local_abspath('repository'))
 
642
        self.assertEqual(relpath, '')
 
643
 
 
644
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
 
645
        self.make_branch_and_tree('foo')
 
646
        self.build_tree(['foo/bar/'])
 
647
        tree, branch, repo, relpath = \
 
648
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
649
                'foo/bar')
 
650
        self.assertEqual(os.path.realpath('foo'),
 
651
                         os.path.realpath(tree.basedir))
 
652
        self.assertEqual(os.path.realpath('foo'),
 
653
                         self.local_branch_path(branch))
 
654
        self.assertEqual(
 
655
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
 
656
            repo.bzrdir.transport.local_abspath('repository'))
 
657
        self.assertEqual(relpath, 'bar')
 
658
 
 
659
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
 
660
        self.make_repository('bar')
 
661
        self.build_tree(['bar/baz/'])
 
662
        tree, branch, repo, relpath = \
 
663
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
664
                'bar/baz')
 
665
        self.assertEqual(tree, None)
 
666
        self.assertEqual(branch, None)
 
667
        self.assertEqual(
 
668
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
 
669
            repo.bzrdir.transport.local_abspath('repository'))
 
670
        self.assertEqual(relpath, 'baz')
 
671
 
 
672
    def test_open_containing_from_transport(self):
 
673
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
674
                          get_transport(self.get_readonly_url('')))
 
675
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
676
                          get_transport(self.get_readonly_url('g/p/q')))
 
677
        control = bzrdir.BzrDir.create(self.get_url())
 
678
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
679
            get_transport(self.get_readonly_url('')))
 
680
        self.assertEqual('', relpath)
 
681
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
682
            get_transport(self.get_readonly_url('g/p/q')))
 
683
        self.assertEqual('g/p/q', relpath)
 
684
 
 
685
    def test_open_containing_tree_or_branch(self):
 
686
        self.make_branch_and_tree('topdir')
 
687
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
688
            'topdir/foo')
 
689
        self.assertEqual(os.path.realpath('topdir'),
 
690
                         os.path.realpath(tree.basedir))
 
691
        self.assertEqual(os.path.realpath('topdir'),
 
692
                         self.local_branch_path(branch))
 
693
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
694
        self.assertEqual('foo', relpath)
 
695
        # opening from non-local should not return the tree
 
696
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
697
            self.get_readonly_url('topdir/foo'))
 
698
        self.assertEqual(None, tree)
 
699
        self.assertEqual('foo', relpath)
 
700
        # without a tree:
 
701
        self.make_branch('topdir/foo')
 
702
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
703
            'topdir/foo')
 
704
        self.assertIs(tree, None)
 
705
        self.assertEqual(os.path.realpath('topdir/foo'),
 
706
                         self.local_branch_path(branch))
 
707
        self.assertEqual('', relpath)
 
708
 
 
709
    def test_open_tree_or_branch(self):
 
710
        self.make_branch_and_tree('topdir')
 
711
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
 
712
        self.assertEqual(os.path.realpath('topdir'),
 
713
                         os.path.realpath(tree.basedir))
 
714
        self.assertEqual(os.path.realpath('topdir'),
 
715
                         self.local_branch_path(branch))
 
716
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
717
        # opening from non-local should not return the tree
 
718
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
 
719
            self.get_readonly_url('topdir'))
 
720
        self.assertEqual(None, tree)
 
721
        # without a tree:
 
722
        self.make_branch('topdir/foo')
 
723
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
 
724
        self.assertIs(tree, None)
 
725
        self.assertEqual(os.path.realpath('topdir/foo'),
 
726
                         self.local_branch_path(branch))
 
727
 
 
728
    def test_open_from_transport(self):
 
729
        # transport pointing at bzrdir should give a bzrdir with root transport
 
730
        # set to the given transport
 
731
        control = bzrdir.BzrDir.create(self.get_url())
 
732
        transport = get_transport(self.get_url())
 
733
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
 
734
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
 
735
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
736
 
 
737
    def test_open_from_transport_no_bzrdir(self):
 
738
        transport = get_transport(self.get_url())
 
739
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
740
                          transport)
 
741
 
 
742
    def test_open_from_transport_bzrdir_in_parent(self):
 
743
        control = bzrdir.BzrDir.create(self.get_url())
 
744
        transport = get_transport(self.get_url())
 
745
        transport.mkdir('subdir')
 
746
        transport = transport.clone('subdir')
 
747
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
748
                          transport)
 
749
 
 
750
    def test_sprout_recursive(self):
 
751
        tree = self.make_branch_and_tree('tree1',
 
752
                                         format='dirstate-with-subtree')
 
753
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
754
            format='dirstate-with-subtree')
 
755
        sub_tree.set_root_id('subtree-root')
 
756
        tree.add_reference(sub_tree)
 
757
        self.build_tree(['tree1/subtree/file'])
 
758
        sub_tree.add('file')
 
759
        tree.commit('Initial commit')
 
760
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
 
761
        tree2.lock_read()
 
762
        self.addCleanup(tree2.unlock)
 
763
        self.failUnlessExists('tree2/subtree/file')
 
764
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
 
765
 
 
766
    def test_cloning_metadir(self):
 
767
        """Ensure that cloning metadir is suitable"""
 
768
        bzrdir = self.make_bzrdir('bzrdir')
 
769
        bzrdir.cloning_metadir()
 
770
        branch = self.make_branch('branch', format='knit')
 
771
        format = branch.bzrdir.cloning_metadir()
 
772
        self.assertIsInstance(format.workingtree_format,
 
773
            workingtree.WorkingTreeFormat3)
 
774
 
 
775
    def test_sprout_recursive_treeless(self):
 
776
        tree = self.make_branch_and_tree('tree1',
 
777
            format='dirstate-with-subtree')
 
778
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
779
            format='dirstate-with-subtree')
 
780
        tree.add_reference(sub_tree)
 
781
        self.build_tree(['tree1/subtree/file'])
 
782
        sub_tree.add('file')
 
783
        tree.commit('Initial commit')
 
784
        tree.bzrdir.destroy_workingtree()
 
785
        repo = self.make_repository('repo', shared=True,
 
786
            format='dirstate-with-subtree')
 
787
        repo.set_make_working_trees(False)
 
788
        tree.bzrdir.sprout('repo/tree2')
 
789
        self.failUnlessExists('repo/tree2/subtree')
 
790
        self.failIfExists('repo/tree2/subtree/file')
 
791
 
 
792
    def make_foo_bar_baz(self):
 
793
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
 
794
        bar = self.make_branch('foo/bar').bzrdir
 
795
        baz = self.make_branch('baz').bzrdir
 
796
        return foo, bar, baz
 
797
 
 
798
    def test_find_bzrdirs(self):
 
799
        foo, bar, baz = self.make_foo_bar_baz()
 
800
        transport = get_transport(self.get_url())
 
801
        self.assertEqualBzrdirs([baz, foo, bar],
 
802
                                bzrdir.BzrDir.find_bzrdirs(transport))
 
803
 
 
804
    def test_find_bzrdirs_list_current(self):
 
805
        def list_current(transport):
 
806
            return [s for s in transport.list_dir('') if s != 'baz']
 
807
 
 
808
        foo, bar, baz = self.make_foo_bar_baz()
 
809
        transport = get_transport(self.get_url())
 
810
        self.assertEqualBzrdirs([foo, bar],
 
811
                                bzrdir.BzrDir.find_bzrdirs(transport,
 
812
                                    list_current=list_current))
 
813
 
 
814
 
 
815
    def test_find_bzrdirs_evaluate(self):
 
816
        def evaluate(bzrdir):
 
817
            try:
 
818
                repo = bzrdir.open_repository()
 
819
            except NoRepositoryPresent:
 
820
                return True, bzrdir.root_transport.base
 
821
            else:
 
822
                return False, bzrdir.root_transport.base
 
823
 
 
824
        foo, bar, baz = self.make_foo_bar_baz()
 
825
        transport = get_transport(self.get_url())
 
826
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
 
827
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
 
828
                                                         evaluate=evaluate)))
 
829
 
 
830
    def assertEqualBzrdirs(self, first, second):
 
831
        first = list(first)
 
832
        second = list(second)
 
833
        self.assertEqual(len(first), len(second))
 
834
        for x, y in zip(first, second):
 
835
            self.assertEqual(x.root_transport.base, y.root_transport.base)
 
836
 
 
837
    def test_find_branches(self):
 
838
        root = self.make_repository('', shared=True)
 
839
        foo, bar, baz = self.make_foo_bar_baz()
 
840
        qux = self.make_bzrdir('foo/qux')
 
841
        transport = get_transport(self.get_url())
 
842
        branches = bzrdir.BzrDir.find_branches(transport)
 
843
        self.assertEqual(baz.root_transport.base, branches[0].base)
 
844
        self.assertEqual(foo.root_transport.base, branches[1].base)
 
845
        self.assertEqual(bar.root_transport.base, branches[2].base)
 
846
 
 
847
        # ensure this works without a top-level repo
 
848
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
 
849
        self.assertEqual(foo.root_transport.base, branches[0].base)
 
850
        self.assertEqual(bar.root_transport.base, branches[1].base)
 
851
 
 
852
 
 
853
class TestMeta1DirFormat(TestCaseWithTransport):
 
854
    """Tests specific to the meta1 dir format."""
 
855
 
 
856
    def test_right_base_dirs(self):
 
857
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
858
        t = dir.transport
 
859
        branch_base = t.clone('branch').base
 
860
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
861
        self.assertEqual(branch_base,
 
862
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
 
863
        repository_base = t.clone('repository').base
 
864
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
865
        self.assertEqual(repository_base,
 
866
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
867
        checkout_base = t.clone('checkout').base
 
868
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
 
869
        self.assertEqual(checkout_base,
 
870
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
 
871
 
 
872
    def test_meta1dir_uses_lockdir(self):
 
873
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
874
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
875
        t = dir.transport
 
876
        self.assertIsDirectory('branch-lock', t)
 
877
 
 
878
    def test_comparison(self):
 
879
        """Equality and inequality behave properly.
 
880
 
 
881
        Metadirs should compare equal iff they have the same repo, branch and
 
882
        tree formats.
 
883
        """
 
884
        mydir = bzrdir.format_registry.make_bzrdir('knit')
 
885
        self.assertEqual(mydir, mydir)
 
886
        self.assertFalse(mydir != mydir)
 
887
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
 
888
        self.assertEqual(otherdir, mydir)
 
889
        self.assertFalse(otherdir != mydir)
 
890
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
891
        self.assertNotEqual(otherdir2, mydir)
 
892
        self.assertFalse(otherdir2 == mydir)
 
893
 
 
894
    def test_needs_conversion_different_working_tree(self):
 
895
        # meta1dirs need an conversion if any element is not the default.
 
896
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
897
        tree = self.make_branch_and_tree('tree', format='knit')
 
898
        self.assertTrue(tree.bzrdir.needs_format_conversion(
 
899
            new_format))
 
900
 
 
901
    def test_initialize_on_format_uses_smart_transport(self):
 
902
        self.setup_smart_server_with_call_log()
 
903
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
904
        transport = self.get_transport('target')
 
905
        transport.ensure_base()
 
906
        self.reset_smart_call_log()
 
907
        instance = new_format.initialize_on_transport(transport)
 
908
        self.assertIsInstance(instance, remote.RemoteBzrDir)
 
909
        rpc_count = len(self.hpss_calls)
 
910
        # This figure represent the amount of work to perform this use case. It
 
911
        # is entirely ok to reduce this number if a test fails due to rpc_count
 
912
        # being too low. If rpc_count increases, more network roundtrips have
 
913
        # become necessary for this use case. Please do not adjust this number
 
914
        # upwards without agreement from bzr's network support maintainers.
 
915
        self.assertEqual(2, rpc_count)
 
916
 
 
917
 
 
918
class TestFormat5(TestCaseWithTransport):
 
919
    """Tests specific to the version 5 bzrdir format."""
 
920
 
 
921
    def test_same_lockfiles_between_tree_repo_branch(self):
 
922
        # this checks that only a single lockfiles instance is created
 
923
        # for format 5 objects
 
924
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
925
        def check_dir_components_use_same_lock(dir):
 
926
            ctrl_1 = dir.open_repository().control_files
 
927
            ctrl_2 = dir.open_branch().control_files
 
928
            ctrl_3 = dir.open_workingtree()._control_files
 
929
            self.assertTrue(ctrl_1 is ctrl_2)
 
930
            self.assertTrue(ctrl_2 is ctrl_3)
 
931
        check_dir_components_use_same_lock(dir)
 
932
        # and if we open it normally.
 
933
        dir = bzrdir.BzrDir.open(self.get_url())
 
934
        check_dir_components_use_same_lock(dir)
 
935
 
 
936
    def test_can_convert(self):
 
937
        # format 5 dirs are convertable
 
938
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
939
        self.assertTrue(dir.can_convert_format())
 
940
 
 
941
    def test_needs_conversion(self):
 
942
        # format 5 dirs need a conversion if they are not the default,
 
943
        # and they aren't
 
944
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
945
        # don't need to convert it to itself
 
946
        self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
 
947
        # do need to convert it to the current default
 
948
        self.assertTrue(dir.needs_format_conversion(
 
949
            bzrdir.BzrDirFormat.get_default_format()))
 
950
 
 
951
 
 
952
class TestFormat6(TestCaseWithTransport):
 
953
    """Tests specific to the version 6 bzrdir format."""
 
954
 
 
955
    def test_same_lockfiles_between_tree_repo_branch(self):
 
956
        # this checks that only a single lockfiles instance is created
 
957
        # for format 6 objects
 
958
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
959
        def check_dir_components_use_same_lock(dir):
 
960
            ctrl_1 = dir.open_repository().control_files
 
961
            ctrl_2 = dir.open_branch().control_files
 
962
            ctrl_3 = dir.open_workingtree()._control_files
 
963
            self.assertTrue(ctrl_1 is ctrl_2)
 
964
            self.assertTrue(ctrl_2 is ctrl_3)
 
965
        check_dir_components_use_same_lock(dir)
 
966
        # and if we open it normally.
 
967
        dir = bzrdir.BzrDir.open(self.get_url())
 
968
        check_dir_components_use_same_lock(dir)
 
969
 
 
970
    def test_can_convert(self):
 
971
        # format 6 dirs are convertable
 
972
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
973
        self.assertTrue(dir.can_convert_format())
 
974
 
 
975
    def test_needs_conversion(self):
 
976
        # format 6 dirs need an conversion if they are not the default.
 
977
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
978
        self.assertTrue(dir.needs_format_conversion(
 
979
            bzrdir.BzrDirFormat.get_default_format()))
 
980
 
 
981
 
 
982
class NotBzrDir(bzrlib.bzrdir.BzrDir):
 
983
    """A non .bzr based control directory."""
 
984
 
 
985
    def __init__(self, transport, format):
 
986
        self._format = format
 
987
        self.root_transport = transport
 
988
        self.transport = transport.clone('.not')
 
989
 
 
990
 
 
991
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
 
992
    """A test class representing any non-.bzr based disk format."""
 
993
 
 
994
    def initialize_on_transport(self, transport):
 
995
        """Initialize a new .not dir in the base directory of a Transport."""
 
996
        transport.mkdir('.not')
 
997
        return self.open(transport)
 
998
 
 
999
    def open(self, transport):
 
1000
        """Open this directory."""
 
1001
        return NotBzrDir(transport, self)
 
1002
 
 
1003
    @classmethod
 
1004
    def _known_formats(self):
 
1005
        return set([NotBzrDirFormat()])
 
1006
 
 
1007
    @classmethod
 
1008
    def probe_transport(self, transport):
 
1009
        """Our format is present if the transport ends in '.not/'."""
 
1010
        if transport.has('.not'):
 
1011
            return NotBzrDirFormat()
 
1012
 
 
1013
 
 
1014
class TestNotBzrDir(TestCaseWithTransport):
 
1015
    """Tests for using the bzrdir api with a non .bzr based disk format.
 
1016
 
 
1017
    If/when one of these is in the core, we can let the implementation tests
 
1018
    verify this works.
 
1019
    """
 
1020
 
 
1021
    def test_create_and_find_format(self):
 
1022
        # create a .notbzr dir
 
1023
        format = NotBzrDirFormat()
 
1024
        dir = format.initialize(self.get_url())
 
1025
        self.assertIsInstance(dir, NotBzrDir)
 
1026
        # now probe for it.
 
1027
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
1028
        try:
 
1029
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
 
1030
                get_transport(self.get_url()))
 
1031
            self.assertIsInstance(found, NotBzrDirFormat)
 
1032
        finally:
 
1033
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
1034
 
 
1035
    def test_included_in_known_formats(self):
 
1036
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
1037
        try:
 
1038
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
 
1039
            for format in formats:
 
1040
                if isinstance(format, NotBzrDirFormat):
 
1041
                    return
 
1042
            self.fail("No NotBzrDirFormat in %s" % formats)
 
1043
        finally:
 
1044
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
1045
 
 
1046
 
 
1047
class NonLocalTests(TestCaseWithTransport):
 
1048
    """Tests for bzrdir static behaviour on non local paths."""
 
1049
 
 
1050
    def setUp(self):
 
1051
        super(NonLocalTests, self).setUp()
 
1052
        self.vfs_transport_factory = MemoryServer
 
1053
 
 
1054
    def test_create_branch_convenience(self):
 
1055
        # outside a repo the default convenience output is a repo+branch_tree
 
1056
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1057
        branch = bzrdir.BzrDir.create_branch_convenience(
 
1058
            self.get_url('foo'), format=format)
 
1059
        self.assertRaises(errors.NoWorkingTree,
 
1060
                          branch.bzrdir.open_workingtree)
 
1061
        branch.bzrdir.open_repository()
 
1062
 
 
1063
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
1064
        # outside a repo the default convenience output is a repo+branch_tree
 
1065
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1066
        self.assertRaises(errors.NotLocalUrl,
 
1067
            bzrdir.BzrDir.create_branch_convenience,
 
1068
            self.get_url('foo'),
 
1069
            force_new_tree=True,
 
1070
            format=format)
 
1071
        t = get_transport(self.get_url('.'))
 
1072
        self.assertFalse(t.has('foo'))
 
1073
 
 
1074
    def test_clone(self):
 
1075
        # clone into a nonlocal path works
 
1076
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1077
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
1078
                                                         format=format)
 
1079
        branch.bzrdir.open_workingtree()
 
1080
        result = branch.bzrdir.clone(self.get_url('remote'))
 
1081
        self.assertRaises(errors.NoWorkingTree,
 
1082
                          result.open_workingtree)
 
1083
        result.open_branch()
 
1084
        result.open_repository()
 
1085
 
 
1086
    def test_checkout_metadir(self):
 
1087
        # checkout_metadir has reasonable working tree format even when no
 
1088
        # working tree is present
 
1089
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
1090
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
1091
        checkout_format = my_bzrdir.checkout_metadir()
 
1092
        self.assertIsInstance(checkout_format.workingtree_format,
 
1093
                              workingtree.WorkingTreeFormat3)
 
1094
 
 
1095
 
 
1096
class TestHTTPRedirections(object):
 
1097
    """Test redirection between two http servers.
 
1098
 
 
1099
    This MUST be used by daughter classes that also inherit from
 
1100
    TestCaseWithTwoWebservers.
 
1101
 
 
1102
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
1103
    test framework will try to create an instance which cannot
 
1104
    run, its implementation being incomplete.
 
1105
    """
 
1106
 
 
1107
    def create_transport_readonly_server(self):
 
1108
        return http_utils.HTTPServerRedirecting()
 
1109
 
 
1110
    def create_transport_secondary_server(self):
 
1111
        return http_utils.HTTPServerRedirecting()
 
1112
 
 
1113
    def setUp(self):
 
1114
        super(TestHTTPRedirections, self).setUp()
 
1115
        # The redirections will point to the new server
 
1116
        self.new_server = self.get_readonly_server()
 
1117
        # The requests to the old server will be redirected
 
1118
        self.old_server = self.get_secondary_server()
 
1119
        # Configure the redirections
 
1120
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
1121
 
 
1122
    def test_loop(self):
 
1123
        # Both servers redirect to each other creating a loop
 
1124
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
1125
        # Starting from either server should loop
 
1126
        old_url = self._qualified_url(self.old_server.host,
 
1127
                                      self.old_server.port)
 
1128
        oldt = self._transport(old_url)
 
1129
        self.assertRaises(errors.NotBranchError,
 
1130
                          bzrdir.BzrDir.open_from_transport, oldt)
 
1131
        new_url = self._qualified_url(self.new_server.host,
 
1132
                                      self.new_server.port)
 
1133
        newt = self._transport(new_url)
 
1134
        self.assertRaises(errors.NotBranchError,
 
1135
                          bzrdir.BzrDir.open_from_transport, newt)
 
1136
 
 
1137
    def test_qualifier_preserved(self):
 
1138
        wt = self.make_branch_and_tree('branch')
 
1139
        old_url = self._qualified_url(self.old_server.host,
 
1140
                                      self.old_server.port)
 
1141
        start = self._transport(old_url).clone('branch')
 
1142
        bdir = bzrdir.BzrDir.open_from_transport(start)
 
1143
        # Redirection should preserve the qualifier, hence the transport class
 
1144
        # itself.
 
1145
        self.assertIsInstance(bdir.root_transport, type(start))
 
1146
 
 
1147
 
 
1148
class TestHTTPRedirections_urllib(TestHTTPRedirections,
 
1149
                                  http_utils.TestCaseWithTwoWebservers):
 
1150
    """Tests redirections for urllib implementation"""
 
1151
 
 
1152
    _transport = HttpTransport_urllib
 
1153
 
 
1154
    def _qualified_url(self, host, port):
 
1155
        result = 'http+urllib://%s:%s' % (host, port)
 
1156
        self.permit_url(result)
 
1157
        return result
 
1158
 
 
1159
 
 
1160
 
 
1161
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
 
1162
                                  TestHTTPRedirections,
 
1163
                                  http_utils.TestCaseWithTwoWebservers):
 
1164
    """Tests redirections for pycurl implementation"""
 
1165
 
 
1166
    def _qualified_url(self, host, port):
 
1167
        result = 'http+pycurl://%s:%s' % (host, port)
 
1168
        self.permit_url(result)
 
1169
        return result
 
1170
 
 
1171
 
 
1172
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
 
1173
                                  http_utils.TestCaseWithTwoWebservers):
 
1174
    """Tests redirections for the nosmart decorator"""
 
1175
 
 
1176
    _transport = NoSmartTransportDecorator
 
1177
 
 
1178
    def _qualified_url(self, host, port):
 
1179
        result = 'nosmart+http://%s:%s' % (host, port)
 
1180
        self.permit_url(result)
 
1181
        return result
 
1182
 
 
1183
 
 
1184
class TestHTTPRedirections_readonly(TestHTTPRedirections,
 
1185
                                    http_utils.TestCaseWithTwoWebservers):
 
1186
    """Tests redirections for readonly decoratror"""
 
1187
 
 
1188
    _transport = ReadonlyTransportDecorator
 
1189
 
 
1190
    def _qualified_url(self, host, port):
 
1191
        result = 'readonly+http://%s:%s' % (host, port)
 
1192
        self.permit_url(result)
 
1193
        return result
 
1194
 
 
1195
 
 
1196
class TestDotBzrHidden(TestCaseWithTransport):
 
1197
 
 
1198
    ls = ['ls']
 
1199
    if sys.platform == 'win32':
 
1200
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
 
1201
 
 
1202
    def get_ls(self):
 
1203
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
 
1204
            stderr=subprocess.PIPE)
 
1205
        out, err = f.communicate()
 
1206
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
 
1207
                         % (self.ls, err))
 
1208
        return out.splitlines()
 
1209
 
 
1210
    def test_dot_bzr_hidden(self):
 
1211
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1212
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1213
        b = bzrdir.BzrDir.create('.')
 
1214
        self.build_tree(['a'])
 
1215
        self.assertEquals(['a'], self.get_ls())
 
1216
 
 
1217
    def test_dot_bzr_hidden_with_url(self):
 
1218
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1219
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1220
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
 
1221
        self.build_tree(['a'])
 
1222
        self.assertEquals(['a'], self.get_ls())
 
1223
 
 
1224
 
 
1225
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
 
1226
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
 
1227
 
 
1228
    def _open(self, transport):
 
1229
        return _TestBzrDir(transport, self)
 
1230
 
 
1231
 
 
1232
class _TestBzrDir(bzrdir.BzrDirMeta1):
 
1233
    """Test BzrDir implementation for TestBzrDirSprout.
 
1234
 
 
1235
    When created a _TestBzrDir already has repository and a branch.  The branch
 
1236
    is a test double as well.
 
1237
    """
 
1238
 
 
1239
    def __init__(self, *args, **kwargs):
 
1240
        super(_TestBzrDir, self).__init__(*args, **kwargs)
 
1241
        self.test_branch = _TestBranch()
 
1242
        self.test_branch.repository = self.create_repository()
 
1243
 
 
1244
    def open_branch(self, unsupported=False):
 
1245
        return self.test_branch
 
1246
 
 
1247
    def cloning_metadir(self, require_stacking=False):
 
1248
        return _TestBzrDirFormat()
 
1249
 
 
1250
 
 
1251
class _TestBranchFormat(bzrlib.branch.BranchFormat):
 
1252
    """Test Branch format for TestBzrDirSprout."""
 
1253
 
 
1254
 
 
1255
class _TestBranch(bzrlib.branch.Branch):
 
1256
    """Test Branch implementation for TestBzrDirSprout."""
 
1257
 
 
1258
    def __init__(self, *args, **kwargs):
 
1259
        self._format = _TestBranchFormat()
 
1260
        super(_TestBranch, self).__init__(*args, **kwargs)
 
1261
        self.calls = []
 
1262
        self._parent = None
 
1263
 
 
1264
    def sprout(self, *args, **kwargs):
 
1265
        self.calls.append('sprout')
 
1266
        return _TestBranch()
 
1267
 
 
1268
    def copy_content_into(self, destination, revision_id=None):
 
1269
        self.calls.append('copy_content_into')
 
1270
 
 
1271
    def get_parent(self):
 
1272
        return self._parent
 
1273
 
 
1274
    def set_parent(self, parent):
 
1275
        self._parent = parent
 
1276
 
 
1277
 
 
1278
class TestBzrDirSprout(TestCaseWithMemoryTransport):
 
1279
 
 
1280
    def test_sprout_uses_branch_sprout(self):
 
1281
        """BzrDir.sprout calls Branch.sprout.
 
1282
 
 
1283
        Usually, BzrDir.sprout should delegate to the branch's sprout method
 
1284
        for part of the work.  This allows the source branch to control the
 
1285
        choice of format for the new branch.
 
1286
 
 
1287
        There are exceptions, but this tests avoids them:
 
1288
          - if there's no branch in the source bzrdir,
 
1289
          - or if the stacking has been requested and the format needs to be
 
1290
            overridden to satisfy that.
 
1291
        """
 
1292
        # Make an instrumented bzrdir.
 
1293
        t = self.get_transport('source')
 
1294
        t.ensure_base()
 
1295
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
 
1296
        # The instrumented bzrdir has a test_branch attribute that logs calls
 
1297
        # made to the branch contained in that bzrdir.  Initially the test
 
1298
        # branch exists but no calls have been made to it.
 
1299
        self.assertEqual([], source_bzrdir.test_branch.calls)
 
1300
 
 
1301
        # Sprout the bzrdir
 
1302
        target_url = self.get_url('target')
 
1303
        result = source_bzrdir.sprout(target_url, recurse='no')
 
1304
 
 
1305
        # The bzrdir called the branch's sprout method.
 
1306
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
 
1307
 
 
1308
    def test_sprout_parent(self):
 
1309
        grandparent_tree = self.make_branch('grandparent')
 
1310
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
 
1311
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
 
1312
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
 
1313
 
 
1314
 
 
1315
class TestBzrDirHooks(TestCaseWithMemoryTransport):
 
1316
 
 
1317
    def test_pre_open_called(self):
 
1318
        calls = []
 
1319
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
 
1320
        transport = self.get_transport('foo')
 
1321
        url = transport.base
 
1322
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
 
1323
        self.assertEqual([transport.base], [t.base for t in calls])
 
1324
 
 
1325
    def test_pre_open_actual_exceptions_raised(self):
 
1326
        count = [0]
 
1327
        def fail_once(transport):
 
1328
            count[0] += 1
 
1329
            if count[0] == 1:
 
1330
                raise errors.BzrError("fail")
 
1331
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
 
1332
        transport = self.get_transport('foo')
 
1333
        url = transport.base
 
1334
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
 
1335
        self.assertEqual('fail', err._preformatted_string)