~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2009-06-10 01:02:49 UTC
  • mfrom: (4420.2.2 1.16-bencode-compat-385212)
  • Revision ID: pqm@pqm.ubuntu.com-20090610010249-5iyq9oics6tysru4
(jam) restore a compatibility module at bzrlib.util.bencode (bug
        #385212)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/bzr_dir_implementations.
 
20
"""
 
21
 
 
22
import os
 
23
import os.path
 
24
from StringIO import StringIO
 
25
import subprocess
 
26
import sys
 
27
 
 
28
from bzrlib import (
 
29
    bzrdir,
 
30
    errors,
 
31
    help_topics,
 
32
    repository,
 
33
    osutils,
 
34
    symbol_versioning,
 
35
    remote,
 
36
    urlutils,
 
37
    win32utils,
 
38
    workingtree,
 
39
    )
 
40
import bzrlib.branch
 
41
from bzrlib.errors import (NotBranchError,
 
42
                           UnknownFormatError,
 
43
                           UnsupportedFormatError,
 
44
                           )
 
45
from bzrlib.tests import (
 
46
    TestCase,
 
47
    TestCaseWithMemoryTransport,
 
48
    TestCaseWithTransport,
 
49
    TestSkipped,
 
50
    test_sftp_transport
 
51
    )
 
52
from bzrlib.tests import(
 
53
    http_server,
 
54
    http_utils,
 
55
    )
 
56
from bzrlib.tests.test_http import TestWithTransport_pycurl
 
57
from bzrlib.transport import get_transport
 
58
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
59
from bzrlib.transport.memory import MemoryServer
 
60
from bzrlib.transport.nosmart import NoSmartTransportDecorator
 
61
from bzrlib.transport.readonly import ReadonlyTransportDecorator
 
62
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
 
63
 
 
64
 
 
65
class TestDefaultFormat(TestCase):
 
66
 
 
67
    def test_get_set_default_format(self):
 
68
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
69
        # default is BzrDirFormat6
 
70
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
 
71
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
72
        # creating a bzr dir should now create an instrumented dir.
 
73
        try:
 
74
            result = bzrdir.BzrDir.create('memory:///')
 
75
            self.failUnless(isinstance(result, SampleBzrDir))
 
76
        finally:
 
77
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
78
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
79
 
 
80
 
 
81
class TestFormatRegistry(TestCase):
 
82
 
 
83
    def make_format_registry(self):
 
84
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
85
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
 
86
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
87
            ' repositories', deprecated=True)
 
88
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
 
89
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
 
90
        my_format_registry.register_metadir('knit',
 
91
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
 
92
            'Format using knits',
 
93
            )
 
94
        my_format_registry.set_default('knit')
 
95
        my_format_registry.register_metadir(
 
96
            'branch6',
 
97
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
98
            'Experimental successor to knit.  Use at your own risk.',
 
99
            branch_format='bzrlib.branch.BzrBranchFormat6',
 
100
            experimental=True)
 
101
        my_format_registry.register_metadir(
 
102
            'hidden format',
 
103
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
104
            'Experimental successor to knit.  Use at your own risk.',
 
105
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
 
106
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
 
107
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
108
            ' repositories', hidden=True)
 
109
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
 
110
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
 
111
            hidden=True)
 
112
        return my_format_registry
 
113
 
 
114
    def test_format_registry(self):
 
115
        my_format_registry = self.make_format_registry()
 
116
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
 
117
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
118
        my_bzrdir = my_format_registry.make_bzrdir('weave')
 
119
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
120
        my_bzrdir = my_format_registry.make_bzrdir('default')
 
121
        self.assertIsInstance(my_bzrdir.repository_format,
 
122
            knitrepo.RepositoryFormatKnit1)
 
123
        my_bzrdir = my_format_registry.make_bzrdir('knit')
 
124
        self.assertIsInstance(my_bzrdir.repository_format,
 
125
            knitrepo.RepositoryFormatKnit1)
 
126
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
 
127
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
128
                              bzrlib.branch.BzrBranchFormat6)
 
129
 
 
130
    def test_get_help(self):
 
131
        my_format_registry = self.make_format_registry()
 
132
        self.assertEqual('Format registered lazily',
 
133
                         my_format_registry.get_help('lazy'))
 
134
        self.assertEqual('Format using knits',
 
135
                         my_format_registry.get_help('knit'))
 
136
        self.assertEqual('Format using knits',
 
137
                         my_format_registry.get_help('default'))
 
138
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
 
139
                         ' checkouts or shared repositories',
 
140
                         my_format_registry.get_help('weave'))
 
141
 
 
142
    def test_help_topic(self):
 
143
        topics = help_topics.HelpTopicRegistry()
 
144
        registry = self.make_format_registry()
 
145
        topics.register('current-formats', registry.help_topic,
 
146
                        'Current formats')
 
147
        topics.register('other-formats', registry.help_topic,
 
148
                        'Other formats')
 
149
        new = topics.get_detail('current-formats')
 
150
        rest = topics.get_detail('other-formats')
 
151
        experimental, deprecated = rest.split('Deprecated formats')
 
152
        self.assertContainsRe(new, 'bzr help formats')
 
153
        self.assertContainsRe(new,
 
154
                ':knit:\n    \(native\) \(default\) Format using knits\n')
 
155
        self.assertContainsRe(experimental,
 
156
                ':branch6:\n    \(native\) Experimental successor to knit')
 
157
        self.assertContainsRe(deprecated,
 
158
                ':lazy:\n    \(native\) Format registered lazily\n')
 
159
        self.assertNotContainsRe(new, 'hidden')
 
160
 
 
161
    def test_set_default_repository(self):
 
162
        default_factory = bzrdir.format_registry.get('default')
 
163
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
 
164
                       if v == default_factory and k != 'default'][0]
 
165
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
 
166
        try:
 
167
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
 
168
                          bzrdir.format_registry.get('default'))
 
169
            self.assertIs(
 
170
                repository.RepositoryFormat.get_default_format().__class__,
 
171
                knitrepo.RepositoryFormatKnit3)
 
172
        finally:
 
173
            bzrdir.format_registry.set_default_repository(old_default)
 
174
 
 
175
    def test_aliases(self):
 
176
        a_registry = bzrdir.BzrDirFormatRegistry()
 
177
        a_registry.register('weave', bzrdir.BzrDirFormat6,
 
178
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
179
            ' repositories', deprecated=True)
 
180
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
 
181
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
182
            ' repositories', deprecated=True, alias=True)
 
183
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
 
184
 
 
185
 
 
186
class SampleBranch(bzrlib.branch.Branch):
 
187
    """A dummy branch for guess what, dummy use."""
 
188
 
 
189
    def __init__(self, dir):
 
190
        self.bzrdir = dir
 
191
 
 
192
 
 
193
class SampleRepository(bzrlib.repository.Repository):
 
194
    """A dummy repo."""
 
195
 
 
196
    def __init__(self, dir):
 
197
        self.bzrdir = dir
 
198
 
 
199
 
 
200
class SampleBzrDir(bzrdir.BzrDir):
 
201
    """A sample BzrDir implementation to allow testing static methods."""
 
202
 
 
203
    def create_repository(self, shared=False):
 
204
        """See BzrDir.create_repository."""
 
205
        return "A repository"
 
206
 
 
207
    def open_repository(self):
 
208
        """See BzrDir.open_repository."""
 
209
        return SampleRepository(self)
 
210
 
 
211
    def create_branch(self):
 
212
        """See BzrDir.create_branch."""
 
213
        return SampleBranch(self)
 
214
 
 
215
    def create_workingtree(self):
 
216
        """See BzrDir.create_workingtree."""
 
217
        return "A tree"
 
218
 
 
219
 
 
220
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
221
    """A sample format
 
222
 
 
223
    this format is initializable, unsupported to aid in testing the
 
224
    open and open_downlevel routines.
 
225
    """
 
226
 
 
227
    def get_format_string(self):
 
228
        """See BzrDirFormat.get_format_string()."""
 
229
        return "Sample .bzr dir format."
 
230
 
 
231
    def initialize_on_transport(self, t):
 
232
        """Create a bzr dir."""
 
233
        t.mkdir('.bzr')
 
234
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
235
        return SampleBzrDir(t, self)
 
236
 
 
237
    def is_supported(self):
 
238
        return False
 
239
 
 
240
    def open(self, transport, _found=None):
 
241
        return "opened branch."
 
242
 
 
243
 
 
244
class TestBzrDirFormat(TestCaseWithTransport):
 
245
    """Tests for the BzrDirFormat facility."""
 
246
 
 
247
    def test_find_format(self):
 
248
        # is the right format object found for a branch?
 
249
        # create a branch with a few known format objects.
 
250
        # this is not quite the same as
 
251
        t = get_transport(self.get_url())
 
252
        self.build_tree(["foo/", "bar/"], transport=t)
 
253
        def check_format(format, url):
 
254
            format.initialize(url)
 
255
            t = get_transport(url)
 
256
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
257
            self.failUnless(isinstance(found_format, format.__class__))
 
258
        check_format(bzrdir.BzrDirFormat5(), "foo")
 
259
        check_format(bzrdir.BzrDirFormat6(), "bar")
 
260
 
 
261
    def test_find_format_nothing_there(self):
 
262
        self.assertRaises(NotBranchError,
 
263
                          bzrdir.BzrDirFormat.find_format,
 
264
                          get_transport('.'))
 
265
 
 
266
    def test_find_format_unknown_format(self):
 
267
        t = get_transport(self.get_url())
 
268
        t.mkdir('.bzr')
 
269
        t.put_bytes('.bzr/branch-format', '')
 
270
        self.assertRaises(UnknownFormatError,
 
271
                          bzrdir.BzrDirFormat.find_format,
 
272
                          get_transport('.'))
 
273
 
 
274
    def test_register_unregister_format(self):
 
275
        format = SampleBzrDirFormat()
 
276
        url = self.get_url()
 
277
        # make a bzrdir
 
278
        format.initialize(url)
 
279
        # register a format for it.
 
280
        bzrdir.BzrDirFormat.register_format(format)
 
281
        # which bzrdir.Open will refuse (not supported)
 
282
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
283
        # which bzrdir.open_containing will refuse (not supported)
 
284
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
 
285
        # but open_downlevel will work
 
286
        t = get_transport(url)
 
287
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
288
        # unregister the format
 
289
        bzrdir.BzrDirFormat.unregister_format(format)
 
290
        # now open_downlevel should fail too.
 
291
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
 
292
 
 
293
    def test_create_branch_and_repo_uses_default(self):
 
294
        format = SampleBzrDirFormat()
 
295
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
296
                                                      format=format)
 
297
        self.assertTrue(isinstance(branch, SampleBranch))
 
298
 
 
299
    def test_create_branch_and_repo_under_shared(self):
 
300
        # creating a branch and repo in a shared repo uses the
 
301
        # shared repository
 
302
        format = bzrdir.format_registry.make_bzrdir('knit')
 
303
        self.make_repository('.', shared=True, format=format)
 
304
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
305
            self.get_url('child'), format=format)
 
306
        self.assertRaises(errors.NoRepositoryPresent,
 
307
                          branch.bzrdir.open_repository)
 
308
 
 
309
    def test_create_branch_and_repo_under_shared_force_new(self):
 
310
        # creating a branch and repo in a shared repo can be forced to
 
311
        # make a new repo
 
312
        format = bzrdir.format_registry.make_bzrdir('knit')
 
313
        self.make_repository('.', shared=True, format=format)
 
314
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
315
                                                      force_new_repo=True,
 
316
                                                      format=format)
 
317
        branch.bzrdir.open_repository()
 
318
 
 
319
    def test_create_standalone_working_tree(self):
 
320
        format = SampleBzrDirFormat()
 
321
        # note this is deliberately readonly, as this failure should
 
322
        # occur before any writes.
 
323
        self.assertRaises(errors.NotLocalUrl,
 
324
                          bzrdir.BzrDir.create_standalone_workingtree,
 
325
                          self.get_readonly_url(), format=format)
 
326
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
 
327
                                                           format=format)
 
328
        self.assertEqual('A tree', tree)
 
329
 
 
330
    def test_create_standalone_working_tree_under_shared_repo(self):
 
331
        # create standalone working tree always makes a repo.
 
332
        format = bzrdir.format_registry.make_bzrdir('knit')
 
333
        self.make_repository('.', shared=True, format=format)
 
334
        # note this is deliberately readonly, as this failure should
 
335
        # occur before any writes.
 
336
        self.assertRaises(errors.NotLocalUrl,
 
337
                          bzrdir.BzrDir.create_standalone_workingtree,
 
338
                          self.get_readonly_url('child'), format=format)
 
339
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
 
340
            format=format)
 
341
        tree.bzrdir.open_repository()
 
342
 
 
343
    def test_create_branch_convenience(self):
 
344
        # outside a repo the default convenience output is a repo+branch_tree
 
345
        format = bzrdir.format_registry.make_bzrdir('knit')
 
346
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
347
        branch.bzrdir.open_workingtree()
 
348
        branch.bzrdir.open_repository()
 
349
 
 
350
    def test_create_branch_convenience_possible_transports(self):
 
351
        """Check that the optional 'possible_transports' is recognized"""
 
352
        format = bzrdir.format_registry.make_bzrdir('knit')
 
353
        t = self.get_transport()
 
354
        branch = bzrdir.BzrDir.create_branch_convenience(
 
355
            '.', format=format, possible_transports=[t])
 
356
        branch.bzrdir.open_workingtree()
 
357
        branch.bzrdir.open_repository()
 
358
 
 
359
    def test_create_branch_convenience_root(self):
 
360
        """Creating a branch at the root of a fs should work."""
 
361
        self.vfs_transport_factory = MemoryServer
 
362
        # outside a repo the default convenience output is a repo+branch_tree
 
363
        format = bzrdir.format_registry.make_bzrdir('knit')
 
364
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
 
365
                                                         format=format)
 
366
        self.assertRaises(errors.NoWorkingTree,
 
367
                          branch.bzrdir.open_workingtree)
 
368
        branch.bzrdir.open_repository()
 
369
 
 
370
    def test_create_branch_convenience_under_shared_repo(self):
 
371
        # inside a repo the default convenience output is a branch+ follow the
 
372
        # repo tree policy
 
373
        format = bzrdir.format_registry.make_bzrdir('knit')
 
374
        self.make_repository('.', shared=True, format=format)
 
375
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
376
            format=format)
 
377
        branch.bzrdir.open_workingtree()
 
378
        self.assertRaises(errors.NoRepositoryPresent,
 
379
                          branch.bzrdir.open_repository)
 
380
 
 
381
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
382
        # inside a repo the default convenience output is a branch+ follow the
 
383
        # repo tree policy but we can override that
 
384
        format = bzrdir.format_registry.make_bzrdir('knit')
 
385
        self.make_repository('.', shared=True, format=format)
 
386
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
387
            force_new_tree=False, format=format)
 
388
        self.assertRaises(errors.NoWorkingTree,
 
389
                          branch.bzrdir.open_workingtree)
 
390
        self.assertRaises(errors.NoRepositoryPresent,
 
391
                          branch.bzrdir.open_repository)
 
392
 
 
393
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
394
        # inside a repo the default convenience output is a branch+ follow the
 
395
        # repo tree policy
 
396
        format = bzrdir.format_registry.make_bzrdir('knit')
 
397
        repo = self.make_repository('.', shared=True, format=format)
 
398
        repo.set_make_working_trees(False)
 
399
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
400
                                                         format=format)
 
401
        self.assertRaises(errors.NoWorkingTree,
 
402
                          branch.bzrdir.open_workingtree)
 
403
        self.assertRaises(errors.NoRepositoryPresent,
 
404
                          branch.bzrdir.open_repository)
 
405
 
 
406
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
407
        # inside a repo the default convenience output is a branch+ follow the
 
408
        # repo tree policy but we can override that
 
409
        format = bzrdir.format_registry.make_bzrdir('knit')
 
410
        repo = self.make_repository('.', shared=True, format=format)
 
411
        repo.set_make_working_trees(False)
 
412
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
413
            force_new_tree=True, format=format)
 
414
        branch.bzrdir.open_workingtree()
 
415
        self.assertRaises(errors.NoRepositoryPresent,
 
416
                          branch.bzrdir.open_repository)
 
417
 
 
418
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
419
        # inside a repo the default convenience output is overridable to give
 
420
        # repo+branch+tree
 
421
        format = bzrdir.format_registry.make_bzrdir('knit')
 
422
        self.make_repository('.', shared=True, format=format)
 
423
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
424
            force_new_repo=True, format=format)
 
425
        branch.bzrdir.open_repository()
 
426
        branch.bzrdir.open_workingtree()
 
427
 
 
428
 
 
429
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
 
430
 
 
431
    def test_acquire_repository_standalone(self):
 
432
        """The default acquisition policy should create a standalone branch."""
 
433
        my_bzrdir = self.make_bzrdir('.')
 
434
        repo_policy = my_bzrdir.determine_repository_policy()
 
435
        repo, is_new = repo_policy.acquire_repository()
 
436
        self.assertEqual(repo.bzrdir.root_transport.base,
 
437
                         my_bzrdir.root_transport.base)
 
438
        self.assertFalse(repo.is_shared())
 
439
 
 
440
    def test_determine_stacking_policy(self):
 
441
        parent_bzrdir = self.make_bzrdir('.')
 
442
        child_bzrdir = self.make_bzrdir('child')
 
443
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
 
444
        repo_policy = child_bzrdir.determine_repository_policy()
 
445
        self.assertEqual('http://example.org', repo_policy._stack_on)
 
446
 
 
447
    def test_determine_stacking_policy_relative(self):
 
448
        parent_bzrdir = self.make_bzrdir('.')
 
449
        child_bzrdir = self.make_bzrdir('child')
 
450
        parent_bzrdir.get_config().set_default_stack_on('child2')
 
451
        repo_policy = child_bzrdir.determine_repository_policy()
 
452
        self.assertEqual('child2', repo_policy._stack_on)
 
453
        self.assertEqual(parent_bzrdir.root_transport.base,
 
454
                         repo_policy._stack_on_pwd)
 
455
 
 
456
    def prepare_default_stacking(self, child_format='1.6'):
 
457
        parent_bzrdir = self.make_bzrdir('.')
 
458
        child_branch = self.make_branch('child', format=child_format)
 
459
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
 
460
        new_child_transport = parent_bzrdir.transport.clone('child2')
 
461
        return child_branch, new_child_transport
 
462
 
 
463
    def test_clone_on_transport_obeys_stacking_policy(self):
 
464
        child_branch, new_child_transport = self.prepare_default_stacking()
 
465
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
466
        self.assertEqual(child_branch.base,
 
467
                         new_child.open_branch().get_stacked_on_url())
 
468
 
 
469
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
 
470
        # Make stackable source branch with an unstackable repo format.
 
471
        source_bzrdir = self.make_bzrdir('source')
 
472
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
 
473
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
 
474
        # Make a directory with a default stacking policy
 
475
        parent_bzrdir = self.make_bzrdir('parent')
 
476
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
 
477
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
 
478
        # Clone source into directory
 
479
        target = source_bzrdir.clone(self.get_url('parent/target'))
 
480
 
 
481
    def test_sprout_obeys_stacking_policy(self):
 
482
        child_branch, new_child_transport = self.prepare_default_stacking()
 
483
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
484
        self.assertEqual(child_branch.base,
 
485
                         new_child.open_branch().get_stacked_on_url())
 
486
 
 
487
    def test_clone_ignores_policy_for_unsupported_formats(self):
 
488
        child_branch, new_child_transport = self.prepare_default_stacking(
 
489
            child_format='pack-0.92')
 
490
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
491
        self.assertRaises(errors.UnstackableBranchFormat,
 
492
                          new_child.open_branch().get_stacked_on_url)
 
493
 
 
494
    def test_sprout_ignores_policy_for_unsupported_formats(self):
 
495
        child_branch, new_child_transport = self.prepare_default_stacking(
 
496
            child_format='pack-0.92')
 
497
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
498
        self.assertRaises(errors.UnstackableBranchFormat,
 
499
                          new_child.open_branch().get_stacked_on_url)
 
500
 
 
501
    def test_sprout_upgrades_format_if_stacked_specified(self):
 
502
        child_branch, new_child_transport = self.prepare_default_stacking(
 
503
            child_format='pack-0.92')
 
504
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
505
                                               stacked=True)
 
506
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
507
                         new_child.open_branch().get_stacked_on_url())
 
508
        repo = new_child.open_repository()
 
509
        self.assertTrue(repo._format.supports_external_lookups)
 
510
        self.assertFalse(repo.supports_rich_root())
 
511
 
 
512
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
 
513
        child_branch, new_child_transport = self.prepare_default_stacking(
 
514
            child_format='pack-0.92')
 
515
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
 
516
            stacked_on=child_branch.bzrdir.root_transport.base)
 
517
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
518
                         new_child.open_branch().get_stacked_on_url())
 
519
        repo = new_child.open_repository()
 
520
        self.assertTrue(repo._format.supports_external_lookups)
 
521
        self.assertFalse(repo.supports_rich_root())
 
522
 
 
523
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
 
524
        child_branch, new_child_transport = self.prepare_default_stacking(
 
525
            child_format='rich-root-pack')
 
526
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
527
                                               stacked=True)
 
528
        repo = new_child.open_repository()
 
529
        self.assertTrue(repo._format.supports_external_lookups)
 
530
        self.assertTrue(repo.supports_rich_root())
 
531
 
 
532
    def test_add_fallback_repo_handles_absolute_urls(self):
 
533
        stack_on = self.make_branch('stack_on', format='1.6')
 
534
        repo = self.make_repository('repo', format='1.6')
 
535
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
 
536
        policy._add_fallback(repo)
 
537
 
 
538
    def test_add_fallback_repo_handles_relative_urls(self):
 
539
        stack_on = self.make_branch('stack_on', format='1.6')
 
540
        repo = self.make_repository('repo', format='1.6')
 
541
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
 
542
        policy._add_fallback(repo)
 
543
 
 
544
    def test_configure_relative_branch_stacking_url(self):
 
545
        stack_on = self.make_branch('stack_on', format='1.6')
 
546
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
547
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
548
            '.', stack_on.base)
 
549
        policy.configure_branch(stacked)
 
550
        self.assertEqual('..', stacked.get_stacked_on_url())
 
551
 
 
552
    def test_relative_branch_stacking_to_absolute(self):
 
553
        stack_on = self.make_branch('stack_on', format='1.6')
 
554
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
555
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
556
            '.', self.get_readonly_url('stack_on'))
 
557
        policy.configure_branch(stacked)
 
558
        self.assertEqual(self.get_readonly_url('stack_on'),
 
559
                         stacked.get_stacked_on_url())
 
560
 
 
561
 
 
562
class ChrootedTests(TestCaseWithTransport):
 
563
    """A support class that provides readonly urls outside the local namespace.
 
564
 
 
565
    This is done by checking if self.transport_server is a MemoryServer. if it
 
566
    is then we are chrooted already, if it is not then an HttpServer is used
 
567
    for readonly urls.
 
568
    """
 
569
 
 
570
    def setUp(self):
 
571
        super(ChrootedTests, self).setUp()
 
572
        if not self.vfs_transport_factory == MemoryServer:
 
573
            self.transport_readonly_server = http_server.HttpServer
 
574
 
 
575
    def local_branch_path(self, branch):
 
576
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
577
 
 
578
    def test_open_containing(self):
 
579
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
580
                          self.get_readonly_url(''))
 
581
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
582
                          self.get_readonly_url('g/p/q'))
 
583
        control = bzrdir.BzrDir.create(self.get_url())
 
584
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
 
585
        self.assertEqual('', relpath)
 
586
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
 
587
        self.assertEqual('g/p/q', relpath)
 
588
 
 
589
    def test_open_containing_tree_branch_or_repository_empty(self):
 
590
        self.assertRaises(errors.NotBranchError,
 
591
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
 
592
            self.get_readonly_url(''))
 
593
 
 
594
    def test_open_containing_tree_branch_or_repository_all(self):
 
595
        self.make_branch_and_tree('topdir')
 
596
        tree, branch, repo, relpath = \
 
597
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
598
                'topdir/foo')
 
599
        self.assertEqual(os.path.realpath('topdir'),
 
600
                         os.path.realpath(tree.basedir))
 
601
        self.assertEqual(os.path.realpath('topdir'),
 
602
                         self.local_branch_path(branch))
 
603
        self.assertEqual(
 
604
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
 
605
            repo.bzrdir.transport.local_abspath('repository'))
 
606
        self.assertEqual(relpath, 'foo')
 
607
 
 
608
    def test_open_containing_tree_branch_or_repository_no_tree(self):
 
609
        self.make_branch('branch')
 
610
        tree, branch, repo, relpath = \
 
611
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
612
                'branch/foo')
 
613
        self.assertEqual(tree, None)
 
614
        self.assertEqual(os.path.realpath('branch'),
 
615
                         self.local_branch_path(branch))
 
616
        self.assertEqual(
 
617
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
 
618
            repo.bzrdir.transport.local_abspath('repository'))
 
619
        self.assertEqual(relpath, 'foo')
 
620
 
 
621
    def test_open_containing_tree_branch_or_repository_repo(self):
 
622
        self.make_repository('repo')
 
623
        tree, branch, repo, relpath = \
 
624
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
625
                'repo')
 
626
        self.assertEqual(tree, None)
 
627
        self.assertEqual(branch, None)
 
628
        self.assertEqual(
 
629
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
 
630
            repo.bzrdir.transport.local_abspath('repository'))
 
631
        self.assertEqual(relpath, '')
 
632
 
 
633
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
 
634
        self.make_repository('shared', shared=True)
 
635
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
 
636
                                                force_new_tree=False)
 
637
        tree, branch, repo, relpath = \
 
638
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
639
                'shared/branch')
 
640
        self.assertEqual(tree, None)
 
641
        self.assertEqual(os.path.realpath('shared/branch'),
 
642
                         self.local_branch_path(branch))
 
643
        self.assertEqual(
 
644
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
 
645
            repo.bzrdir.transport.local_abspath('repository'))
 
646
        self.assertEqual(relpath, '')
 
647
 
 
648
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
 
649
        self.make_branch_and_tree('foo')
 
650
        self.build_tree(['foo/bar/'])
 
651
        tree, branch, repo, relpath = \
 
652
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
653
                'foo/bar')
 
654
        self.assertEqual(os.path.realpath('foo'),
 
655
                         os.path.realpath(tree.basedir))
 
656
        self.assertEqual(os.path.realpath('foo'),
 
657
                         self.local_branch_path(branch))
 
658
        self.assertEqual(
 
659
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
 
660
            repo.bzrdir.transport.local_abspath('repository'))
 
661
        self.assertEqual(relpath, 'bar')
 
662
 
 
663
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
 
664
        self.make_repository('bar')
 
665
        self.build_tree(['bar/baz/'])
 
666
        tree, branch, repo, relpath = \
 
667
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
668
                'bar/baz')
 
669
        self.assertEqual(tree, None)
 
670
        self.assertEqual(branch, None)
 
671
        self.assertEqual(
 
672
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
 
673
            repo.bzrdir.transport.local_abspath('repository'))
 
674
        self.assertEqual(relpath, 'baz')
 
675
 
 
676
    def test_open_containing_from_transport(self):
 
677
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
678
                          get_transport(self.get_readonly_url('')))
 
679
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
680
                          get_transport(self.get_readonly_url('g/p/q')))
 
681
        control = bzrdir.BzrDir.create(self.get_url())
 
682
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
683
            get_transport(self.get_readonly_url('')))
 
684
        self.assertEqual('', relpath)
 
685
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
686
            get_transport(self.get_readonly_url('g/p/q')))
 
687
        self.assertEqual('g/p/q', relpath)
 
688
 
 
689
    def test_open_containing_tree_or_branch(self):
 
690
        self.make_branch_and_tree('topdir')
 
691
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
692
            'topdir/foo')
 
693
        self.assertEqual(os.path.realpath('topdir'),
 
694
                         os.path.realpath(tree.basedir))
 
695
        self.assertEqual(os.path.realpath('topdir'),
 
696
                         self.local_branch_path(branch))
 
697
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
698
        self.assertEqual('foo', relpath)
 
699
        # opening from non-local should not return the tree
 
700
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
701
            self.get_readonly_url('topdir/foo'))
 
702
        self.assertEqual(None, tree)
 
703
        self.assertEqual('foo', relpath)
 
704
        # without a tree:
 
705
        self.make_branch('topdir/foo')
 
706
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
707
            'topdir/foo')
 
708
        self.assertIs(tree, None)
 
709
        self.assertEqual(os.path.realpath('topdir/foo'),
 
710
                         self.local_branch_path(branch))
 
711
        self.assertEqual('', relpath)
 
712
 
 
713
    def test_open_tree_or_branch(self):
 
714
        self.make_branch_and_tree('topdir')
 
715
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
 
716
        self.assertEqual(os.path.realpath('topdir'),
 
717
                         os.path.realpath(tree.basedir))
 
718
        self.assertEqual(os.path.realpath('topdir'),
 
719
                         self.local_branch_path(branch))
 
720
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
721
        # opening from non-local should not return the tree
 
722
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
 
723
            self.get_readonly_url('topdir'))
 
724
        self.assertEqual(None, tree)
 
725
        # without a tree:
 
726
        self.make_branch('topdir/foo')
 
727
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
 
728
        self.assertIs(tree, None)
 
729
        self.assertEqual(os.path.realpath('topdir/foo'),
 
730
                         self.local_branch_path(branch))
 
731
 
 
732
    def test_open_from_transport(self):
 
733
        # transport pointing at bzrdir should give a bzrdir with root transport
 
734
        # set to the given transport
 
735
        control = bzrdir.BzrDir.create(self.get_url())
 
736
        transport = get_transport(self.get_url())
 
737
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
 
738
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
 
739
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
740
 
 
741
    def test_open_from_transport_no_bzrdir(self):
 
742
        transport = get_transport(self.get_url())
 
743
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
744
                          transport)
 
745
 
 
746
    def test_open_from_transport_bzrdir_in_parent(self):
 
747
        control = bzrdir.BzrDir.create(self.get_url())
 
748
        transport = get_transport(self.get_url())
 
749
        transport.mkdir('subdir')
 
750
        transport = transport.clone('subdir')
 
751
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
752
                          transport)
 
753
 
 
754
    def test_sprout_recursive(self):
 
755
        tree = self.make_branch_and_tree('tree1',
 
756
                                         format='dirstate-with-subtree')
 
757
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
758
            format='dirstate-with-subtree')
 
759
        sub_tree.set_root_id('subtree-root')
 
760
        tree.add_reference(sub_tree)
 
761
        self.build_tree(['tree1/subtree/file'])
 
762
        sub_tree.add('file')
 
763
        tree.commit('Initial commit')
 
764
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
 
765
        tree2.lock_read()
 
766
        self.addCleanup(tree2.unlock)
 
767
        self.failUnlessExists('tree2/subtree/file')
 
768
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
 
769
 
 
770
    def test_cloning_metadir(self):
 
771
        """Ensure that cloning metadir is suitable"""
 
772
        bzrdir = self.make_bzrdir('bzrdir')
 
773
        bzrdir.cloning_metadir()
 
774
        branch = self.make_branch('branch', format='knit')
 
775
        format = branch.bzrdir.cloning_metadir()
 
776
        self.assertIsInstance(format.workingtree_format,
 
777
            workingtree.WorkingTreeFormat3)
 
778
 
 
779
    def test_sprout_recursive_treeless(self):
 
780
        tree = self.make_branch_and_tree('tree1',
 
781
            format='dirstate-with-subtree')
 
782
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
783
            format='dirstate-with-subtree')
 
784
        tree.add_reference(sub_tree)
 
785
        self.build_tree(['tree1/subtree/file'])
 
786
        sub_tree.add('file')
 
787
        tree.commit('Initial commit')
 
788
        tree.bzrdir.destroy_workingtree()
 
789
        repo = self.make_repository('repo', shared=True,
 
790
            format='dirstate-with-subtree')
 
791
        repo.set_make_working_trees(False)
 
792
        tree.bzrdir.sprout('repo/tree2')
 
793
        self.failUnlessExists('repo/tree2/subtree')
 
794
        self.failIfExists('repo/tree2/subtree/file')
 
795
 
 
796
    def make_foo_bar_baz(self):
 
797
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
 
798
        bar = self.make_branch('foo/bar').bzrdir
 
799
        baz = self.make_branch('baz').bzrdir
 
800
        return foo, bar, baz
 
801
 
 
802
    def test_find_bzrdirs(self):
 
803
        foo, bar, baz = self.make_foo_bar_baz()
 
804
        transport = get_transport(self.get_url())
 
805
        self.assertEqualBzrdirs([baz, foo, bar],
 
806
                                bzrdir.BzrDir.find_bzrdirs(transport))
 
807
 
 
808
    def test_find_bzrdirs_list_current(self):
 
809
        def list_current(transport):
 
810
            return [s for s in transport.list_dir('') if s != 'baz']
 
811
 
 
812
        foo, bar, baz = self.make_foo_bar_baz()
 
813
        transport = get_transport(self.get_url())
 
814
        self.assertEqualBzrdirs([foo, bar],
 
815
                                bzrdir.BzrDir.find_bzrdirs(transport,
 
816
                                    list_current=list_current))
 
817
 
 
818
 
 
819
    def test_find_bzrdirs_evaluate(self):
 
820
        def evaluate(bzrdir):
 
821
            try:
 
822
                repo = bzrdir.open_repository()
 
823
            except NoRepositoryPresent:
 
824
                return True, bzrdir.root_transport.base
 
825
            else:
 
826
                return False, bzrdir.root_transport.base
 
827
 
 
828
        foo, bar, baz = self.make_foo_bar_baz()
 
829
        transport = get_transport(self.get_url())
 
830
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
 
831
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
 
832
                                                         evaluate=evaluate)))
 
833
 
 
834
    def assertEqualBzrdirs(self, first, second):
 
835
        first = list(first)
 
836
        second = list(second)
 
837
        self.assertEqual(len(first), len(second))
 
838
        for x, y in zip(first, second):
 
839
            self.assertEqual(x.root_transport.base, y.root_transport.base)
 
840
 
 
841
    def test_find_branches(self):
 
842
        root = self.make_repository('', shared=True)
 
843
        foo, bar, baz = self.make_foo_bar_baz()
 
844
        qux = self.make_bzrdir('foo/qux')
 
845
        transport = get_transport(self.get_url())
 
846
        branches = bzrdir.BzrDir.find_branches(transport)
 
847
        self.assertEqual(baz.root_transport.base, branches[0].base)
 
848
        self.assertEqual(foo.root_transport.base, branches[1].base)
 
849
        self.assertEqual(bar.root_transport.base, branches[2].base)
 
850
 
 
851
        # ensure this works without a top-level repo
 
852
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
 
853
        self.assertEqual(foo.root_transport.base, branches[0].base)
 
854
        self.assertEqual(bar.root_transport.base, branches[1].base)
 
855
 
 
856
 
 
857
class TestMeta1DirFormat(TestCaseWithTransport):
 
858
    """Tests specific to the meta1 dir format."""
 
859
 
 
860
    def test_right_base_dirs(self):
 
861
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
862
        t = dir.transport
 
863
        branch_base = t.clone('branch').base
 
864
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
865
        self.assertEqual(branch_base,
 
866
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
 
867
        repository_base = t.clone('repository').base
 
868
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
869
        self.assertEqual(repository_base,
 
870
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
871
        checkout_base = t.clone('checkout').base
 
872
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
 
873
        self.assertEqual(checkout_base,
 
874
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
 
875
 
 
876
    def test_meta1dir_uses_lockdir(self):
 
877
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
878
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
879
        t = dir.transport
 
880
        self.assertIsDirectory('branch-lock', t)
 
881
 
 
882
    def test_comparison(self):
 
883
        """Equality and inequality behave properly.
 
884
 
 
885
        Metadirs should compare equal iff they have the same repo, branch and
 
886
        tree formats.
 
887
        """
 
888
        mydir = bzrdir.format_registry.make_bzrdir('knit')
 
889
        self.assertEqual(mydir, mydir)
 
890
        self.assertFalse(mydir != mydir)
 
891
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
 
892
        self.assertEqual(otherdir, mydir)
 
893
        self.assertFalse(otherdir != mydir)
 
894
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
895
        self.assertNotEqual(otherdir2, mydir)
 
896
        self.assertFalse(otherdir2 == mydir)
 
897
 
 
898
    def test_needs_conversion_different_working_tree(self):
 
899
        # meta1dirs need an conversion if any element is not the default.
 
900
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
901
        tree = self.make_branch_and_tree('tree', format='knit')
 
902
        self.assertTrue(tree.bzrdir.needs_format_conversion(
 
903
            new_format))
 
904
 
 
905
    def test_initialize_on_format_uses_smart_transport(self):
 
906
        self.setup_smart_server_with_call_log()
 
907
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
908
        transport = self.get_transport('target')
 
909
        transport.ensure_base()
 
910
        self.reset_smart_call_log()
 
911
        instance = new_format.initialize_on_transport(transport)
 
912
        self.assertIsInstance(instance, remote.RemoteBzrDir)
 
913
        rpc_count = len(self.hpss_calls)
 
914
        # This figure represent the amount of work to perform this use case. It
 
915
        # is entirely ok to reduce this number if a test fails due to rpc_count
 
916
        # being too low. If rpc_count increases, more network roundtrips have
 
917
        # become necessary for this use case. Please do not adjust this number
 
918
        # upwards without agreement from bzr's network support maintainers.
 
919
        self.assertEqual(2, rpc_count)
 
920
 
 
921
 
 
922
class TestFormat5(TestCaseWithTransport):
 
923
    """Tests specific to the version 5 bzrdir format."""
 
924
 
 
925
    def test_same_lockfiles_between_tree_repo_branch(self):
 
926
        # this checks that only a single lockfiles instance is created
 
927
        # for format 5 objects
 
928
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
929
        def check_dir_components_use_same_lock(dir):
 
930
            ctrl_1 = dir.open_repository().control_files
 
931
            ctrl_2 = dir.open_branch().control_files
 
932
            ctrl_3 = dir.open_workingtree()._control_files
 
933
            self.assertTrue(ctrl_1 is ctrl_2)
 
934
            self.assertTrue(ctrl_2 is ctrl_3)
 
935
        check_dir_components_use_same_lock(dir)
 
936
        # and if we open it normally.
 
937
        dir = bzrdir.BzrDir.open(self.get_url())
 
938
        check_dir_components_use_same_lock(dir)
 
939
 
 
940
    def test_can_convert(self):
 
941
        # format 5 dirs are convertable
 
942
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
943
        self.assertTrue(dir.can_convert_format())
 
944
 
 
945
    def test_needs_conversion(self):
 
946
        # format 5 dirs need a conversion if they are not the default,
 
947
        # and they aren't
 
948
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
949
        # don't need to convert it to itself
 
950
        self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
 
951
        # do need to convert it to the current default
 
952
        self.assertTrue(dir.needs_format_conversion(
 
953
            bzrdir.BzrDirFormat.get_default_format()))
 
954
 
 
955
 
 
956
class TestFormat6(TestCaseWithTransport):
 
957
    """Tests specific to the version 6 bzrdir format."""
 
958
 
 
959
    def test_same_lockfiles_between_tree_repo_branch(self):
 
960
        # this checks that only a single lockfiles instance is created
 
961
        # for format 6 objects
 
962
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
963
        def check_dir_components_use_same_lock(dir):
 
964
            ctrl_1 = dir.open_repository().control_files
 
965
            ctrl_2 = dir.open_branch().control_files
 
966
            ctrl_3 = dir.open_workingtree()._control_files
 
967
            self.assertTrue(ctrl_1 is ctrl_2)
 
968
            self.assertTrue(ctrl_2 is ctrl_3)
 
969
        check_dir_components_use_same_lock(dir)
 
970
        # and if we open it normally.
 
971
        dir = bzrdir.BzrDir.open(self.get_url())
 
972
        check_dir_components_use_same_lock(dir)
 
973
 
 
974
    def test_can_convert(self):
 
975
        # format 6 dirs are convertable
 
976
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
977
        self.assertTrue(dir.can_convert_format())
 
978
 
 
979
    def test_needs_conversion(self):
 
980
        # format 6 dirs need an conversion if they are not the default.
 
981
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
982
        self.assertTrue(dir.needs_format_conversion(
 
983
            bzrdir.BzrDirFormat.get_default_format()))
 
984
 
 
985
 
 
986
class NotBzrDir(bzrlib.bzrdir.BzrDir):
 
987
    """A non .bzr based control directory."""
 
988
 
 
989
    def __init__(self, transport, format):
 
990
        self._format = format
 
991
        self.root_transport = transport
 
992
        self.transport = transport.clone('.not')
 
993
 
 
994
 
 
995
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
 
996
    """A test class representing any non-.bzr based disk format."""
 
997
 
 
998
    def initialize_on_transport(self, transport):
 
999
        """Initialize a new .not dir in the base directory of a Transport."""
 
1000
        transport.mkdir('.not')
 
1001
        return self.open(transport)
 
1002
 
 
1003
    def open(self, transport):
 
1004
        """Open this directory."""
 
1005
        return NotBzrDir(transport, self)
 
1006
 
 
1007
    @classmethod
 
1008
    def _known_formats(self):
 
1009
        return set([NotBzrDirFormat()])
 
1010
 
 
1011
    @classmethod
 
1012
    def probe_transport(self, transport):
 
1013
        """Our format is present if the transport ends in '.not/'."""
 
1014
        if transport.has('.not'):
 
1015
            return NotBzrDirFormat()
 
1016
 
 
1017
 
 
1018
class TestNotBzrDir(TestCaseWithTransport):
 
1019
    """Tests for using the bzrdir api with a non .bzr based disk format.
 
1020
 
 
1021
    If/when one of these is in the core, we can let the implementation tests
 
1022
    verify this works.
 
1023
    """
 
1024
 
 
1025
    def test_create_and_find_format(self):
 
1026
        # create a .notbzr dir
 
1027
        format = NotBzrDirFormat()
 
1028
        dir = format.initialize(self.get_url())
 
1029
        self.assertIsInstance(dir, NotBzrDir)
 
1030
        # now probe for it.
 
1031
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
1032
        try:
 
1033
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
 
1034
                get_transport(self.get_url()))
 
1035
            self.assertIsInstance(found, NotBzrDirFormat)
 
1036
        finally:
 
1037
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
1038
 
 
1039
    def test_included_in_known_formats(self):
 
1040
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
1041
        try:
 
1042
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
 
1043
            for format in formats:
 
1044
                if isinstance(format, NotBzrDirFormat):
 
1045
                    return
 
1046
            self.fail("No NotBzrDirFormat in %s" % formats)
 
1047
        finally:
 
1048
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
1049
 
 
1050
 
 
1051
class NonLocalTests(TestCaseWithTransport):
 
1052
    """Tests for bzrdir static behaviour on non local paths."""
 
1053
 
 
1054
    def setUp(self):
 
1055
        super(NonLocalTests, self).setUp()
 
1056
        self.vfs_transport_factory = MemoryServer
 
1057
 
 
1058
    def test_create_branch_convenience(self):
 
1059
        # outside a repo the default convenience output is a repo+branch_tree
 
1060
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1061
        branch = bzrdir.BzrDir.create_branch_convenience(
 
1062
            self.get_url('foo'), format=format)
 
1063
        self.assertRaises(errors.NoWorkingTree,
 
1064
                          branch.bzrdir.open_workingtree)
 
1065
        branch.bzrdir.open_repository()
 
1066
 
 
1067
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
1068
        # outside a repo the default convenience output is a repo+branch_tree
 
1069
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1070
        self.assertRaises(errors.NotLocalUrl,
 
1071
            bzrdir.BzrDir.create_branch_convenience,
 
1072
            self.get_url('foo'),
 
1073
            force_new_tree=True,
 
1074
            format=format)
 
1075
        t = get_transport(self.get_url('.'))
 
1076
        self.assertFalse(t.has('foo'))
 
1077
 
 
1078
    def test_clone(self):
 
1079
        # clone into a nonlocal path works
 
1080
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1081
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
1082
                                                         format=format)
 
1083
        branch.bzrdir.open_workingtree()
 
1084
        result = branch.bzrdir.clone(self.get_url('remote'))
 
1085
        self.assertRaises(errors.NoWorkingTree,
 
1086
                          result.open_workingtree)
 
1087
        result.open_branch()
 
1088
        result.open_repository()
 
1089
 
 
1090
    def test_checkout_metadir(self):
 
1091
        # checkout_metadir has reasonable working tree format even when no
 
1092
        # working tree is present
 
1093
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
1094
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
1095
        checkout_format = my_bzrdir.checkout_metadir()
 
1096
        self.assertIsInstance(checkout_format.workingtree_format,
 
1097
                              workingtree.WorkingTreeFormat3)
 
1098
 
 
1099
 
 
1100
class TestHTTPRedirections(object):
 
1101
    """Test redirection between two http servers.
 
1102
 
 
1103
    This MUST be used by daughter classes that also inherit from
 
1104
    TestCaseWithTwoWebservers.
 
1105
 
 
1106
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
1107
    test framework will try to create an instance which cannot
 
1108
    run, its implementation being incomplete.
 
1109
    """
 
1110
 
 
1111
    def create_transport_readonly_server(self):
 
1112
        return http_utils.HTTPServerRedirecting()
 
1113
 
 
1114
    def create_transport_secondary_server(self):
 
1115
        return http_utils.HTTPServerRedirecting()
 
1116
 
 
1117
    def setUp(self):
 
1118
        super(TestHTTPRedirections, self).setUp()
 
1119
        # The redirections will point to the new server
 
1120
        self.new_server = self.get_readonly_server()
 
1121
        # The requests to the old server will be redirected
 
1122
        self.old_server = self.get_secondary_server()
 
1123
        # Configure the redirections
 
1124
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
1125
 
 
1126
    def test_loop(self):
 
1127
        # Both servers redirect to each other creating a loop
 
1128
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
1129
        # Starting from either server should loop
 
1130
        old_url = self._qualified_url(self.old_server.host,
 
1131
                                      self.old_server.port)
 
1132
        oldt = self._transport(old_url)
 
1133
        self.assertRaises(errors.NotBranchError,
 
1134
                          bzrdir.BzrDir.open_from_transport, oldt)
 
1135
        new_url = self._qualified_url(self.new_server.host,
 
1136
                                      self.new_server.port)
 
1137
        newt = self._transport(new_url)
 
1138
        self.assertRaises(errors.NotBranchError,
 
1139
                          bzrdir.BzrDir.open_from_transport, newt)
 
1140
 
 
1141
    def test_qualifier_preserved(self):
 
1142
        wt = self.make_branch_and_tree('branch')
 
1143
        old_url = self._qualified_url(self.old_server.host,
 
1144
                                      self.old_server.port)
 
1145
        start = self._transport(old_url).clone('branch')
 
1146
        bdir = bzrdir.BzrDir.open_from_transport(start)
 
1147
        # Redirection should preserve the qualifier, hence the transport class
 
1148
        # itself.
 
1149
        self.assertIsInstance(bdir.root_transport, type(start))
 
1150
 
 
1151
 
 
1152
class TestHTTPRedirections_urllib(TestHTTPRedirections,
 
1153
                                  http_utils.TestCaseWithTwoWebservers):
 
1154
    """Tests redirections for urllib implementation"""
 
1155
 
 
1156
    _transport = HttpTransport_urllib
 
1157
 
 
1158
    def _qualified_url(self, host, port):
 
1159
        return 'http+urllib://%s:%s' % (host, port)
 
1160
 
 
1161
 
 
1162
 
 
1163
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
 
1164
                                  TestHTTPRedirections,
 
1165
                                  http_utils.TestCaseWithTwoWebservers):
 
1166
    """Tests redirections for pycurl implementation"""
 
1167
 
 
1168
    def _qualified_url(self, host, port):
 
1169
        return 'http+pycurl://%s:%s' % (host, port)
 
1170
 
 
1171
 
 
1172
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
 
1173
                                  http_utils.TestCaseWithTwoWebservers):
 
1174
    """Tests redirections for the nosmart decorator"""
 
1175
 
 
1176
    _transport = NoSmartTransportDecorator
 
1177
 
 
1178
    def _qualified_url(self, host, port):
 
1179
        return 'nosmart+http://%s:%s' % (host, port)
 
1180
 
 
1181
 
 
1182
class TestHTTPRedirections_readonly(TestHTTPRedirections,
 
1183
                                    http_utils.TestCaseWithTwoWebservers):
 
1184
    """Tests redirections for readonly decoratror"""
 
1185
 
 
1186
    _transport = ReadonlyTransportDecorator
 
1187
 
 
1188
    def _qualified_url(self, host, port):
 
1189
        return 'readonly+http://%s:%s' % (host, port)
 
1190
 
 
1191
 
 
1192
class TestDotBzrHidden(TestCaseWithTransport):
 
1193
 
 
1194
    ls = ['ls']
 
1195
    if sys.platform == 'win32':
 
1196
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
 
1197
 
 
1198
    def get_ls(self):
 
1199
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
 
1200
            stderr=subprocess.PIPE)
 
1201
        out, err = f.communicate()
 
1202
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
 
1203
                         % (self.ls, err))
 
1204
        return out.splitlines()
 
1205
 
 
1206
    def test_dot_bzr_hidden(self):
 
1207
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1208
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1209
        b = bzrdir.BzrDir.create('.')
 
1210
        self.build_tree(['a'])
 
1211
        self.assertEquals(['a'], self.get_ls())
 
1212
 
 
1213
    def test_dot_bzr_hidden_with_url(self):
 
1214
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1215
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1216
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
 
1217
        self.build_tree(['a'])
 
1218
        self.assertEquals(['a'], self.get_ls())
 
1219
 
 
1220
 
 
1221
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
 
1222
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
 
1223
 
 
1224
    def _open(self, transport):
 
1225
        return _TestBzrDir(transport, self)
 
1226
 
 
1227
 
 
1228
class _TestBzrDir(bzrdir.BzrDirMeta1):
 
1229
    """Test BzrDir implementation for TestBzrDirSprout.
 
1230
 
 
1231
    When created a _TestBzrDir already has repository and a branch.  The branch
 
1232
    is a test double as well.
 
1233
    """
 
1234
 
 
1235
    def __init__(self, *args, **kwargs):
 
1236
        super(_TestBzrDir, self).__init__(*args, **kwargs)
 
1237
        self.test_branch = _TestBranch()
 
1238
        self.test_branch.repository = self.create_repository()
 
1239
 
 
1240
    def open_branch(self, unsupported=False):
 
1241
        return self.test_branch
 
1242
 
 
1243
    def cloning_metadir(self, require_stacking=False):
 
1244
        return _TestBzrDirFormat()
 
1245
 
 
1246
 
 
1247
class _TestBranchFormat(bzrlib.branch.BranchFormat):
 
1248
    """Test Branch format for TestBzrDirSprout."""
 
1249
 
 
1250
 
 
1251
class _TestBranch(bzrlib.branch.Branch):
 
1252
    """Test Branch implementation for TestBzrDirSprout."""
 
1253
 
 
1254
    def __init__(self, *args, **kwargs):
 
1255
        self._format = _TestBranchFormat()
 
1256
        super(_TestBranch, self).__init__(*args, **kwargs)
 
1257
        self.calls = []
 
1258
        self._parent = None
 
1259
 
 
1260
    def sprout(self, *args, **kwargs):
 
1261
        self.calls.append('sprout')
 
1262
        return _TestBranch()
 
1263
 
 
1264
    def copy_content_into(self, destination, revision_id=None):
 
1265
        self.calls.append('copy_content_into')
 
1266
 
 
1267
    def get_parent(self):
 
1268
        return self._parent
 
1269
 
 
1270
    def set_parent(self, parent):
 
1271
        self._parent = parent
 
1272
 
 
1273
 
 
1274
class TestBzrDirSprout(TestCaseWithMemoryTransport):
 
1275
 
 
1276
    def test_sprout_uses_branch_sprout(self):
 
1277
        """BzrDir.sprout calls Branch.sprout.
 
1278
 
 
1279
        Usually, BzrDir.sprout should delegate to the branch's sprout method
 
1280
        for part of the work.  This allows the source branch to control the
 
1281
        choice of format for the new branch.
 
1282
 
 
1283
        There are exceptions, but this tests avoids them:
 
1284
          - if there's no branch in the source bzrdir,
 
1285
          - or if the stacking has been requested and the format needs to be
 
1286
            overridden to satisfy that.
 
1287
        """
 
1288
        # Make an instrumented bzrdir.
 
1289
        t = self.get_transport('source')
 
1290
        t.ensure_base()
 
1291
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
 
1292
        # The instrumented bzrdir has a test_branch attribute that logs calls
 
1293
        # made to the branch contained in that bzrdir.  Initially the test
 
1294
        # branch exists but no calls have been made to it.
 
1295
        self.assertEqual([], source_bzrdir.test_branch.calls)
 
1296
 
 
1297
        # Sprout the bzrdir
 
1298
        target_url = self.get_url('target')
 
1299
        result = source_bzrdir.sprout(target_url, recurse='no')
 
1300
 
 
1301
        # The bzrdir called the branch's sprout method.
 
1302
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
 
1303
 
 
1304
    def test_sprout_parent(self):
 
1305
        grandparent_tree = self.make_branch('grandparent')
 
1306
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
 
1307
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
 
1308
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
 
1309
 
 
1310
 
 
1311
class TestBzrDirHooks(TestCaseWithMemoryTransport):
 
1312
 
 
1313
    def test_pre_open_called(self):
 
1314
        calls = []
 
1315
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
 
1316
        transport = self.get_transport('foo')
 
1317
        url = transport.base
 
1318
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
 
1319
        self.assertEqual([transport.base], [t.base for t in calls])
 
1320
 
 
1321
    def test_pre_open_actual_exceptions_raised(self):
 
1322
        count = [0]
 
1323
        def fail_once(transport):
 
1324
            count[0] += 1
 
1325
            if count[0] == 1:
 
1326
                raise errors.BzrError("fail")
 
1327
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
 
1328
        transport = self.get_transport('foo')
 
1329
        url = transport.base
 
1330
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
 
1331
        self.assertEqual('fail', err._preformatted_string)