~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

cleanup of globster code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/per_bzr_dir.
 
20
"""
 
21
 
 
22
import os
 
23
import subprocess
 
24
import sys
 
25
 
 
26
from bzrlib import (
 
27
    branch,
 
28
    bzrdir,
 
29
    errors,
 
30
    help_topics,
 
31
    repository,
 
32
    osutils,
 
33
    remote,
 
34
    urlutils,
 
35
    win32utils,
 
36
    workingtree,
 
37
    )
 
38
import bzrlib.branch
 
39
from bzrlib.errors import (NotBranchError,
 
40
                           NoColocatedBranchSupport,
 
41
                           UnknownFormatError,
 
42
                           UnsupportedFormatError,
 
43
                           )
 
44
from bzrlib.tests import (
 
45
    TestCase,
 
46
    TestCaseWithMemoryTransport,
 
47
    TestCaseWithTransport,
 
48
    TestSkipped,
 
49
    )
 
50
from bzrlib.tests import(
 
51
    http_server,
 
52
    http_utils,
 
53
    )
 
54
from bzrlib.tests.test_http import TestWithTransport_pycurl
 
55
from bzrlib.transport import (
 
56
    get_transport,
 
57
    memory,
 
58
    pathfilter,
 
59
    )
 
60
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
61
from bzrlib.transport.nosmart import NoSmartTransportDecorator
 
62
from bzrlib.transport.readonly import ReadonlyTransportDecorator
 
63
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
 
64
 
 
65
 
 
66
class TestDefaultFormat(TestCase):
 
67
 
 
68
    def test_get_set_default_format(self):
 
69
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
70
        # default is BzrDirFormat6
 
71
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
 
72
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
73
        # creating a bzr dir should now create an instrumented dir.
 
74
        try:
 
75
            result = bzrdir.BzrDir.create('memory:///')
 
76
            self.failUnless(isinstance(result, SampleBzrDir))
 
77
        finally:
 
78
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
79
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
80
 
 
81
 
 
82
class TestFormatRegistry(TestCase):
 
83
 
 
84
    def make_format_registry(self):
 
85
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
86
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
 
87
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
88
            ' repositories', deprecated=True)
 
89
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
 
90
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
 
91
        my_format_registry.register_metadir('knit',
 
92
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
 
93
            'Format using knits',
 
94
            )
 
95
        my_format_registry.set_default('knit')
 
96
        my_format_registry.register_metadir(
 
97
            'branch6',
 
98
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
99
            'Experimental successor to knit.  Use at your own risk.',
 
100
            branch_format='bzrlib.branch.BzrBranchFormat6',
 
101
            experimental=True)
 
102
        my_format_registry.register_metadir(
 
103
            'hidden format',
 
104
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
105
            'Experimental successor to knit.  Use at your own risk.',
 
106
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
 
107
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
 
108
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
109
            ' repositories', hidden=True)
 
110
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
 
111
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
 
112
            hidden=True)
 
113
        return my_format_registry
 
114
 
 
115
    def test_format_registry(self):
 
116
        my_format_registry = self.make_format_registry()
 
117
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
 
118
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
119
        my_bzrdir = my_format_registry.make_bzrdir('weave')
 
120
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
121
        my_bzrdir = my_format_registry.make_bzrdir('default')
 
122
        self.assertIsInstance(my_bzrdir.repository_format,
 
123
            knitrepo.RepositoryFormatKnit1)
 
124
        my_bzrdir = my_format_registry.make_bzrdir('knit')
 
125
        self.assertIsInstance(my_bzrdir.repository_format,
 
126
            knitrepo.RepositoryFormatKnit1)
 
127
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
 
128
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
129
                              bzrlib.branch.BzrBranchFormat6)
 
130
 
 
131
    def test_get_help(self):
 
132
        my_format_registry = self.make_format_registry()
 
133
        self.assertEqual('Format registered lazily',
 
134
                         my_format_registry.get_help('lazy'))
 
135
        self.assertEqual('Format using knits',
 
136
                         my_format_registry.get_help('knit'))
 
137
        self.assertEqual('Format using knits',
 
138
                         my_format_registry.get_help('default'))
 
139
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
 
140
                         ' checkouts or shared repositories',
 
141
                         my_format_registry.get_help('weave'))
 
142
 
 
143
    def test_help_topic(self):
 
144
        topics = help_topics.HelpTopicRegistry()
 
145
        registry = self.make_format_registry()
 
146
        topics.register('current-formats', registry.help_topic,
 
147
                        'Current formats')
 
148
        topics.register('other-formats', registry.help_topic,
 
149
                        'Other formats')
 
150
        new = topics.get_detail('current-formats')
 
151
        rest = topics.get_detail('other-formats')
 
152
        experimental, deprecated = rest.split('Deprecated formats')
 
153
        self.assertContainsRe(new, 'formats-help')
 
154
        self.assertContainsRe(new,
 
155
                ':knit:\n    \(native\) \(default\) Format using knits\n')
 
156
        self.assertContainsRe(experimental,
 
157
                ':branch6:\n    \(native\) Experimental successor to knit')
 
158
        self.assertContainsRe(deprecated,
 
159
                ':lazy:\n    \(native\) Format registered lazily\n')
 
160
        self.assertNotContainsRe(new, 'hidden')
 
161
 
 
162
    def test_set_default_repository(self):
 
163
        default_factory = bzrdir.format_registry.get('default')
 
164
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
 
165
                       if v == default_factory and k != 'default'][0]
 
166
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
 
167
        try:
 
168
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
 
169
                          bzrdir.format_registry.get('default'))
 
170
            self.assertIs(
 
171
                repository.RepositoryFormat.get_default_format().__class__,
 
172
                knitrepo.RepositoryFormatKnit3)
 
173
        finally:
 
174
            bzrdir.format_registry.set_default_repository(old_default)
 
175
 
 
176
    def test_aliases(self):
 
177
        a_registry = bzrdir.BzrDirFormatRegistry()
 
178
        a_registry.register('weave', bzrdir.BzrDirFormat6,
 
179
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
180
            ' repositories', deprecated=True)
 
181
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
 
182
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
183
            ' repositories', deprecated=True, alias=True)
 
184
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
 
185
 
 
186
 
 
187
class SampleBranch(bzrlib.branch.Branch):
 
188
    """A dummy branch for guess what, dummy use."""
 
189
 
 
190
    def __init__(self, dir):
 
191
        self.bzrdir = dir
 
192
 
 
193
 
 
194
class SampleRepository(bzrlib.repository.Repository):
 
195
    """A dummy repo."""
 
196
 
 
197
    def __init__(self, dir):
 
198
        self.bzrdir = dir
 
199
 
 
200
 
 
201
class SampleBzrDir(bzrdir.BzrDir):
 
202
    """A sample BzrDir implementation to allow testing static methods."""
 
203
 
 
204
    def create_repository(self, shared=False):
 
205
        """See BzrDir.create_repository."""
 
206
        return "A repository"
 
207
 
 
208
    def open_repository(self):
 
209
        """See BzrDir.open_repository."""
 
210
        return SampleRepository(self)
 
211
 
 
212
    def create_branch(self, name=None):
 
213
        """See BzrDir.create_branch."""
 
214
        if name is not None:
 
215
            raise NoColocatedBranchSupport(self)
 
216
        return SampleBranch(self)
 
217
 
 
218
    def create_workingtree(self):
 
219
        """See BzrDir.create_workingtree."""
 
220
        return "A tree"
 
221
 
 
222
 
 
223
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
224
    """A sample format
 
225
 
 
226
    this format is initializable, unsupported to aid in testing the
 
227
    open and open_downlevel routines.
 
228
    """
 
229
 
 
230
    def get_format_string(self):
 
231
        """See BzrDirFormat.get_format_string()."""
 
232
        return "Sample .bzr dir format."
 
233
 
 
234
    def initialize_on_transport(self, t):
 
235
        """Create a bzr dir."""
 
236
        t.mkdir('.bzr')
 
237
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
238
        return SampleBzrDir(t, self)
 
239
 
 
240
    def is_supported(self):
 
241
        return False
 
242
 
 
243
    def open(self, transport, _found=None):
 
244
        return "opened branch."
 
245
 
 
246
 
 
247
class TestBzrDirFormat(TestCaseWithTransport):
 
248
    """Tests for the BzrDirFormat facility."""
 
249
 
 
250
    def test_find_format(self):
 
251
        # is the right format object found for a branch?
 
252
        # create a branch with a few known format objects.
 
253
        # this is not quite the same as
 
254
        t = get_transport(self.get_url())
 
255
        self.build_tree(["foo/", "bar/"], transport=t)
 
256
        def check_format(format, url):
 
257
            format.initialize(url)
 
258
            t = get_transport(url)
 
259
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
260
            self.failUnless(isinstance(found_format, format.__class__))
 
261
        check_format(bzrdir.BzrDirFormat5(), "foo")
 
262
        check_format(bzrdir.BzrDirFormat6(), "bar")
 
263
 
 
264
    def test_find_format_nothing_there(self):
 
265
        self.assertRaises(NotBranchError,
 
266
                          bzrdir.BzrDirFormat.find_format,
 
267
                          get_transport('.'))
 
268
 
 
269
    def test_find_format_unknown_format(self):
 
270
        t = get_transport(self.get_url())
 
271
        t.mkdir('.bzr')
 
272
        t.put_bytes('.bzr/branch-format', '')
 
273
        self.assertRaises(UnknownFormatError,
 
274
                          bzrdir.BzrDirFormat.find_format,
 
275
                          get_transport('.'))
 
276
 
 
277
    def test_register_unregister_format(self):
 
278
        format = SampleBzrDirFormat()
 
279
        url = self.get_url()
 
280
        # make a bzrdir
 
281
        format.initialize(url)
 
282
        # register a format for it.
 
283
        bzrdir.BzrDirFormat.register_format(format)
 
284
        # which bzrdir.Open will refuse (not supported)
 
285
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
286
        # which bzrdir.open_containing will refuse (not supported)
 
287
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
 
288
        # but open_downlevel will work
 
289
        t = get_transport(url)
 
290
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
291
        # unregister the format
 
292
        bzrdir.BzrDirFormat.unregister_format(format)
 
293
        # now open_downlevel should fail too.
 
294
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
 
295
 
 
296
    def test_create_branch_and_repo_uses_default(self):
 
297
        format = SampleBzrDirFormat()
 
298
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
299
                                                      format=format)
 
300
        self.assertTrue(isinstance(branch, SampleBranch))
 
301
 
 
302
    def test_create_branch_and_repo_under_shared(self):
 
303
        # creating a branch and repo in a shared repo uses the
 
304
        # shared repository
 
305
        format = bzrdir.format_registry.make_bzrdir('knit')
 
306
        self.make_repository('.', shared=True, format=format)
 
307
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
308
            self.get_url('child'), format=format)
 
309
        self.assertRaises(errors.NoRepositoryPresent,
 
310
                          branch.bzrdir.open_repository)
 
311
 
 
312
    def test_create_branch_and_repo_under_shared_force_new(self):
 
313
        # creating a branch and repo in a shared repo can be forced to
 
314
        # make a new repo
 
315
        format = bzrdir.format_registry.make_bzrdir('knit')
 
316
        self.make_repository('.', shared=True, format=format)
 
317
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
318
                                                      force_new_repo=True,
 
319
                                                      format=format)
 
320
        branch.bzrdir.open_repository()
 
321
 
 
322
    def test_create_standalone_working_tree(self):
 
323
        format = SampleBzrDirFormat()
 
324
        # note this is deliberately readonly, as this failure should
 
325
        # occur before any writes.
 
326
        self.assertRaises(errors.NotLocalUrl,
 
327
                          bzrdir.BzrDir.create_standalone_workingtree,
 
328
                          self.get_readonly_url(), format=format)
 
329
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
 
330
                                                           format=format)
 
331
        self.assertEqual('A tree', tree)
 
332
 
 
333
    def test_create_standalone_working_tree_under_shared_repo(self):
 
334
        # create standalone working tree always makes a repo.
 
335
        format = bzrdir.format_registry.make_bzrdir('knit')
 
336
        self.make_repository('.', shared=True, format=format)
 
337
        # note this is deliberately readonly, as this failure should
 
338
        # occur before any writes.
 
339
        self.assertRaises(errors.NotLocalUrl,
 
340
                          bzrdir.BzrDir.create_standalone_workingtree,
 
341
                          self.get_readonly_url('child'), format=format)
 
342
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
 
343
            format=format)
 
344
        tree.bzrdir.open_repository()
 
345
 
 
346
    def test_create_branch_convenience(self):
 
347
        # outside a repo the default convenience output is a repo+branch_tree
 
348
        format = bzrdir.format_registry.make_bzrdir('knit')
 
349
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
350
        branch.bzrdir.open_workingtree()
 
351
        branch.bzrdir.open_repository()
 
352
 
 
353
    def test_create_branch_convenience_possible_transports(self):
 
354
        """Check that the optional 'possible_transports' is recognized"""
 
355
        format = bzrdir.format_registry.make_bzrdir('knit')
 
356
        t = self.get_transport()
 
357
        branch = bzrdir.BzrDir.create_branch_convenience(
 
358
            '.', format=format, possible_transports=[t])
 
359
        branch.bzrdir.open_workingtree()
 
360
        branch.bzrdir.open_repository()
 
361
 
 
362
    def test_create_branch_convenience_root(self):
 
363
        """Creating a branch at the root of a fs should work."""
 
364
        self.vfs_transport_factory = memory.MemoryServer
 
365
        # outside a repo the default convenience output is a repo+branch_tree
 
366
        format = bzrdir.format_registry.make_bzrdir('knit')
 
367
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
 
368
                                                         format=format)
 
369
        self.assertRaises(errors.NoWorkingTree,
 
370
                          branch.bzrdir.open_workingtree)
 
371
        branch.bzrdir.open_repository()
 
372
 
 
373
    def test_create_branch_convenience_under_shared_repo(self):
 
374
        # inside a repo the default convenience output is a branch+ follow the
 
375
        # repo tree policy
 
376
        format = bzrdir.format_registry.make_bzrdir('knit')
 
377
        self.make_repository('.', shared=True, format=format)
 
378
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
379
            format=format)
 
380
        branch.bzrdir.open_workingtree()
 
381
        self.assertRaises(errors.NoRepositoryPresent,
 
382
                          branch.bzrdir.open_repository)
 
383
 
 
384
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
385
        # inside a repo the default convenience output is a branch+ follow the
 
386
        # repo tree policy but we can override that
 
387
        format = bzrdir.format_registry.make_bzrdir('knit')
 
388
        self.make_repository('.', shared=True, format=format)
 
389
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
390
            force_new_tree=False, format=format)
 
391
        self.assertRaises(errors.NoWorkingTree,
 
392
                          branch.bzrdir.open_workingtree)
 
393
        self.assertRaises(errors.NoRepositoryPresent,
 
394
                          branch.bzrdir.open_repository)
 
395
 
 
396
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
397
        # inside a repo the default convenience output is a branch+ follow the
 
398
        # repo tree policy
 
399
        format = bzrdir.format_registry.make_bzrdir('knit')
 
400
        repo = self.make_repository('.', shared=True, format=format)
 
401
        repo.set_make_working_trees(False)
 
402
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
403
                                                         format=format)
 
404
        self.assertRaises(errors.NoWorkingTree,
 
405
                          branch.bzrdir.open_workingtree)
 
406
        self.assertRaises(errors.NoRepositoryPresent,
 
407
                          branch.bzrdir.open_repository)
 
408
 
 
409
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
410
        # inside a repo the default convenience output is a branch+ follow the
 
411
        # repo tree policy but we can override that
 
412
        format = bzrdir.format_registry.make_bzrdir('knit')
 
413
        repo = self.make_repository('.', shared=True, format=format)
 
414
        repo.set_make_working_trees(False)
 
415
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
416
            force_new_tree=True, format=format)
 
417
        branch.bzrdir.open_workingtree()
 
418
        self.assertRaises(errors.NoRepositoryPresent,
 
419
                          branch.bzrdir.open_repository)
 
420
 
 
421
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
422
        # inside a repo the default convenience output is overridable to give
 
423
        # repo+branch+tree
 
424
        format = bzrdir.format_registry.make_bzrdir('knit')
 
425
        self.make_repository('.', shared=True, format=format)
 
426
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
427
            force_new_repo=True, format=format)
 
428
        branch.bzrdir.open_repository()
 
429
        branch.bzrdir.open_workingtree()
 
430
 
 
431
 
 
432
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
 
433
 
 
434
    def test_acquire_repository_standalone(self):
 
435
        """The default acquisition policy should create a standalone branch."""
 
436
        my_bzrdir = self.make_bzrdir('.')
 
437
        repo_policy = my_bzrdir.determine_repository_policy()
 
438
        repo, is_new = repo_policy.acquire_repository()
 
439
        self.assertEqual(repo.bzrdir.root_transport.base,
 
440
                         my_bzrdir.root_transport.base)
 
441
        self.assertFalse(repo.is_shared())
 
442
 
 
443
    def test_determine_stacking_policy(self):
 
444
        parent_bzrdir = self.make_bzrdir('.')
 
445
        child_bzrdir = self.make_bzrdir('child')
 
446
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
 
447
        repo_policy = child_bzrdir.determine_repository_policy()
 
448
        self.assertEqual('http://example.org', repo_policy._stack_on)
 
449
 
 
450
    def test_determine_stacking_policy_relative(self):
 
451
        parent_bzrdir = self.make_bzrdir('.')
 
452
        child_bzrdir = self.make_bzrdir('child')
 
453
        parent_bzrdir.get_config().set_default_stack_on('child2')
 
454
        repo_policy = child_bzrdir.determine_repository_policy()
 
455
        self.assertEqual('child2', repo_policy._stack_on)
 
456
        self.assertEqual(parent_bzrdir.root_transport.base,
 
457
                         repo_policy._stack_on_pwd)
 
458
 
 
459
    def prepare_default_stacking(self, child_format='1.6'):
 
460
        parent_bzrdir = self.make_bzrdir('.')
 
461
        child_branch = self.make_branch('child', format=child_format)
 
462
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
 
463
        new_child_transport = parent_bzrdir.transport.clone('child2')
 
464
        return child_branch, new_child_transport
 
465
 
 
466
    def test_clone_on_transport_obeys_stacking_policy(self):
 
467
        child_branch, new_child_transport = self.prepare_default_stacking()
 
468
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
469
        self.assertEqual(child_branch.base,
 
470
                         new_child.open_branch().get_stacked_on_url())
 
471
 
 
472
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
 
473
        # Make stackable source branch with an unstackable repo format.
 
474
        source_bzrdir = self.make_bzrdir('source')
 
475
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
 
476
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
 
477
            source_bzrdir)
 
478
        # Make a directory with a default stacking policy
 
479
        parent_bzrdir = self.make_bzrdir('parent')
 
480
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
 
481
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
 
482
        # Clone source into directory
 
483
        target = source_bzrdir.clone(self.get_url('parent/target'))
 
484
 
 
485
    def test_sprout_obeys_stacking_policy(self):
 
486
        child_branch, new_child_transport = self.prepare_default_stacking()
 
487
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
488
        self.assertEqual(child_branch.base,
 
489
                         new_child.open_branch().get_stacked_on_url())
 
490
 
 
491
    def test_clone_ignores_policy_for_unsupported_formats(self):
 
492
        child_branch, new_child_transport = self.prepare_default_stacking(
 
493
            child_format='pack-0.92')
 
494
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
495
        self.assertRaises(errors.UnstackableBranchFormat,
 
496
                          new_child.open_branch().get_stacked_on_url)
 
497
 
 
498
    def test_sprout_ignores_policy_for_unsupported_formats(self):
 
499
        child_branch, new_child_transport = self.prepare_default_stacking(
 
500
            child_format='pack-0.92')
 
501
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
502
        self.assertRaises(errors.UnstackableBranchFormat,
 
503
                          new_child.open_branch().get_stacked_on_url)
 
504
 
 
505
    def test_sprout_upgrades_format_if_stacked_specified(self):
 
506
        child_branch, new_child_transport = self.prepare_default_stacking(
 
507
            child_format='pack-0.92')
 
508
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
509
                                               stacked=True)
 
510
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
511
                         new_child.open_branch().get_stacked_on_url())
 
512
        repo = new_child.open_repository()
 
513
        self.assertTrue(repo._format.supports_external_lookups)
 
514
        self.assertFalse(repo.supports_rich_root())
 
515
 
 
516
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
 
517
        child_branch, new_child_transport = self.prepare_default_stacking(
 
518
            child_format='pack-0.92')
 
519
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
 
520
            stacked_on=child_branch.bzrdir.root_transport.base)
 
521
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
522
                         new_child.open_branch().get_stacked_on_url())
 
523
        repo = new_child.open_repository()
 
524
        self.assertTrue(repo._format.supports_external_lookups)
 
525
        self.assertFalse(repo.supports_rich_root())
 
526
 
 
527
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
 
528
        child_branch, new_child_transport = self.prepare_default_stacking(
 
529
            child_format='rich-root-pack')
 
530
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
531
                                               stacked=True)
 
532
        repo = new_child.open_repository()
 
533
        self.assertTrue(repo._format.supports_external_lookups)
 
534
        self.assertTrue(repo.supports_rich_root())
 
535
 
 
536
    def test_add_fallback_repo_handles_absolute_urls(self):
 
537
        stack_on = self.make_branch('stack_on', format='1.6')
 
538
        repo = self.make_repository('repo', format='1.6')
 
539
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
 
540
        policy._add_fallback(repo)
 
541
 
 
542
    def test_add_fallback_repo_handles_relative_urls(self):
 
543
        stack_on = self.make_branch('stack_on', format='1.6')
 
544
        repo = self.make_repository('repo', format='1.6')
 
545
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
 
546
        policy._add_fallback(repo)
 
547
 
 
548
    def test_configure_relative_branch_stacking_url(self):
 
549
        stack_on = self.make_branch('stack_on', format='1.6')
 
550
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
551
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
552
            '.', stack_on.base)
 
553
        policy.configure_branch(stacked)
 
554
        self.assertEqual('..', stacked.get_stacked_on_url())
 
555
 
 
556
    def test_relative_branch_stacking_to_absolute(self):
 
557
        stack_on = self.make_branch('stack_on', format='1.6')
 
558
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
559
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
560
            '.', self.get_readonly_url('stack_on'))
 
561
        policy.configure_branch(stacked)
 
562
        self.assertEqual(self.get_readonly_url('stack_on'),
 
563
                         stacked.get_stacked_on_url())
 
564
 
 
565
 
 
566
class ChrootedTests(TestCaseWithTransport):
 
567
    """A support class that provides readonly urls outside the local namespace.
 
568
 
 
569
    This is done by checking if self.transport_server is a MemoryServer. if it
 
570
    is then we are chrooted already, if it is not then an HttpServer is used
 
571
    for readonly urls.
 
572
    """
 
573
 
 
574
    def setUp(self):
 
575
        super(ChrootedTests, self).setUp()
 
576
        if not self.vfs_transport_factory == memory.MemoryServer:
 
577
            self.transport_readonly_server = http_server.HttpServer
 
578
 
 
579
    def local_branch_path(self, branch):
 
580
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
581
 
 
582
    def test_open_containing(self):
 
583
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
584
                          self.get_readonly_url(''))
 
585
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
586
                          self.get_readonly_url('g/p/q'))
 
587
        control = bzrdir.BzrDir.create(self.get_url())
 
588
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
 
589
        self.assertEqual('', relpath)
 
590
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
 
591
        self.assertEqual('g/p/q', relpath)
 
592
 
 
593
    def test_open_containing_tree_branch_or_repository_empty(self):
 
594
        self.assertRaises(errors.NotBranchError,
 
595
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
 
596
            self.get_readonly_url(''))
 
597
 
 
598
    def test_open_containing_tree_branch_or_repository_all(self):
 
599
        self.make_branch_and_tree('topdir')
 
600
        tree, branch, repo, relpath = \
 
601
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
602
                'topdir/foo')
 
603
        self.assertEqual(os.path.realpath('topdir'),
 
604
                         os.path.realpath(tree.basedir))
 
605
        self.assertEqual(os.path.realpath('topdir'),
 
606
                         self.local_branch_path(branch))
 
607
        self.assertEqual(
 
608
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
 
609
            repo.bzrdir.transport.local_abspath('repository'))
 
610
        self.assertEqual(relpath, 'foo')
 
611
 
 
612
    def test_open_containing_tree_branch_or_repository_no_tree(self):
 
613
        self.make_branch('branch')
 
614
        tree, branch, repo, relpath = \
 
615
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
616
                'branch/foo')
 
617
        self.assertEqual(tree, None)
 
618
        self.assertEqual(os.path.realpath('branch'),
 
619
                         self.local_branch_path(branch))
 
620
        self.assertEqual(
 
621
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
 
622
            repo.bzrdir.transport.local_abspath('repository'))
 
623
        self.assertEqual(relpath, 'foo')
 
624
 
 
625
    def test_open_containing_tree_branch_or_repository_repo(self):
 
626
        self.make_repository('repo')
 
627
        tree, branch, repo, relpath = \
 
628
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
629
                'repo')
 
630
        self.assertEqual(tree, None)
 
631
        self.assertEqual(branch, None)
 
632
        self.assertEqual(
 
633
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
 
634
            repo.bzrdir.transport.local_abspath('repository'))
 
635
        self.assertEqual(relpath, '')
 
636
 
 
637
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
 
638
        self.make_repository('shared', shared=True)
 
639
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
 
640
                                                force_new_tree=False)
 
641
        tree, branch, repo, relpath = \
 
642
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
643
                'shared/branch')
 
644
        self.assertEqual(tree, None)
 
645
        self.assertEqual(os.path.realpath('shared/branch'),
 
646
                         self.local_branch_path(branch))
 
647
        self.assertEqual(
 
648
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
 
649
            repo.bzrdir.transport.local_abspath('repository'))
 
650
        self.assertEqual(relpath, '')
 
651
 
 
652
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
 
653
        self.make_branch_and_tree('foo')
 
654
        self.build_tree(['foo/bar/'])
 
655
        tree, branch, repo, relpath = \
 
656
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
657
                'foo/bar')
 
658
        self.assertEqual(os.path.realpath('foo'),
 
659
                         os.path.realpath(tree.basedir))
 
660
        self.assertEqual(os.path.realpath('foo'),
 
661
                         self.local_branch_path(branch))
 
662
        self.assertEqual(
 
663
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
 
664
            repo.bzrdir.transport.local_abspath('repository'))
 
665
        self.assertEqual(relpath, 'bar')
 
666
 
 
667
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
 
668
        self.make_repository('bar')
 
669
        self.build_tree(['bar/baz/'])
 
670
        tree, branch, repo, relpath = \
 
671
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
672
                'bar/baz')
 
673
        self.assertEqual(tree, None)
 
674
        self.assertEqual(branch, None)
 
675
        self.assertEqual(
 
676
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
 
677
            repo.bzrdir.transport.local_abspath('repository'))
 
678
        self.assertEqual(relpath, 'baz')
 
679
 
 
680
    def test_open_containing_from_transport(self):
 
681
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
682
                          get_transport(self.get_readonly_url('')))
 
683
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
684
                          get_transport(self.get_readonly_url('g/p/q')))
 
685
        control = bzrdir.BzrDir.create(self.get_url())
 
686
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
687
            get_transport(self.get_readonly_url('')))
 
688
        self.assertEqual('', relpath)
 
689
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
690
            get_transport(self.get_readonly_url('g/p/q')))
 
691
        self.assertEqual('g/p/q', relpath)
 
692
 
 
693
    def test_open_containing_tree_or_branch(self):
 
694
        self.make_branch_and_tree('topdir')
 
695
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
696
            'topdir/foo')
 
697
        self.assertEqual(os.path.realpath('topdir'),
 
698
                         os.path.realpath(tree.basedir))
 
699
        self.assertEqual(os.path.realpath('topdir'),
 
700
                         self.local_branch_path(branch))
 
701
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
702
        self.assertEqual('foo', relpath)
 
703
        # opening from non-local should not return the tree
 
704
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
705
            self.get_readonly_url('topdir/foo'))
 
706
        self.assertEqual(None, tree)
 
707
        self.assertEqual('foo', relpath)
 
708
        # without a tree:
 
709
        self.make_branch('topdir/foo')
 
710
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
711
            'topdir/foo')
 
712
        self.assertIs(tree, None)
 
713
        self.assertEqual(os.path.realpath('topdir/foo'),
 
714
                         self.local_branch_path(branch))
 
715
        self.assertEqual('', relpath)
 
716
 
 
717
    def test_open_tree_or_branch(self):
 
718
        self.make_branch_and_tree('topdir')
 
719
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
 
720
        self.assertEqual(os.path.realpath('topdir'),
 
721
                         os.path.realpath(tree.basedir))
 
722
        self.assertEqual(os.path.realpath('topdir'),
 
723
                         self.local_branch_path(branch))
 
724
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
725
        # opening from non-local should not return the tree
 
726
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
 
727
            self.get_readonly_url('topdir'))
 
728
        self.assertEqual(None, tree)
 
729
        # without a tree:
 
730
        self.make_branch('topdir/foo')
 
731
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
 
732
        self.assertIs(tree, None)
 
733
        self.assertEqual(os.path.realpath('topdir/foo'),
 
734
                         self.local_branch_path(branch))
 
735
 
 
736
    def test_open_from_transport(self):
 
737
        # transport pointing at bzrdir should give a bzrdir with root transport
 
738
        # set to the given transport
 
739
        control = bzrdir.BzrDir.create(self.get_url())
 
740
        transport = get_transport(self.get_url())
 
741
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
 
742
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
 
743
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
744
 
 
745
    def test_open_from_transport_no_bzrdir(self):
 
746
        transport = get_transport(self.get_url())
 
747
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
748
                          transport)
 
749
 
 
750
    def test_open_from_transport_bzrdir_in_parent(self):
 
751
        control = bzrdir.BzrDir.create(self.get_url())
 
752
        transport = get_transport(self.get_url())
 
753
        transport.mkdir('subdir')
 
754
        transport = transport.clone('subdir')
 
755
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
756
                          transport)
 
757
 
 
758
    def test_sprout_recursive(self):
 
759
        tree = self.make_branch_and_tree('tree1',
 
760
                                         format='dirstate-with-subtree')
 
761
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
762
            format='dirstate-with-subtree')
 
763
        sub_tree.set_root_id('subtree-root')
 
764
        tree.add_reference(sub_tree)
 
765
        self.build_tree(['tree1/subtree/file'])
 
766
        sub_tree.add('file')
 
767
        tree.commit('Initial commit')
 
768
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
 
769
        tree2.lock_read()
 
770
        self.addCleanup(tree2.unlock)
 
771
        self.failUnlessExists('tree2/subtree/file')
 
772
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
 
773
 
 
774
    def test_cloning_metadir(self):
 
775
        """Ensure that cloning metadir is suitable"""
 
776
        bzrdir = self.make_bzrdir('bzrdir')
 
777
        bzrdir.cloning_metadir()
 
778
        branch = self.make_branch('branch', format='knit')
 
779
        format = branch.bzrdir.cloning_metadir()
 
780
        self.assertIsInstance(format.workingtree_format,
 
781
            workingtree.WorkingTreeFormat3)
 
782
 
 
783
    def test_sprout_recursive_treeless(self):
 
784
        tree = self.make_branch_and_tree('tree1',
 
785
            format='dirstate-with-subtree')
 
786
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
787
            format='dirstate-with-subtree')
 
788
        tree.add_reference(sub_tree)
 
789
        self.build_tree(['tree1/subtree/file'])
 
790
        sub_tree.add('file')
 
791
        tree.commit('Initial commit')
 
792
        tree.bzrdir.destroy_workingtree()
 
793
        repo = self.make_repository('repo', shared=True,
 
794
            format='dirstate-with-subtree')
 
795
        repo.set_make_working_trees(False)
 
796
        tree.bzrdir.sprout('repo/tree2')
 
797
        self.failUnlessExists('repo/tree2/subtree')
 
798
        self.failIfExists('repo/tree2/subtree/file')
 
799
 
 
800
    def make_foo_bar_baz(self):
 
801
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
 
802
        bar = self.make_branch('foo/bar').bzrdir
 
803
        baz = self.make_branch('baz').bzrdir
 
804
        return foo, bar, baz
 
805
 
 
806
    def test_find_bzrdirs(self):
 
807
        foo, bar, baz = self.make_foo_bar_baz()
 
808
        transport = get_transport(self.get_url())
 
809
        self.assertEqualBzrdirs([baz, foo, bar],
 
810
                                bzrdir.BzrDir.find_bzrdirs(transport))
 
811
 
 
812
    def make_fake_permission_denied_transport(self, transport, paths):
 
813
        """Create a transport that raises PermissionDenied for some paths."""
 
814
        def filter(path):
 
815
            if path in paths:
 
816
                raise errors.PermissionDenied(path)
 
817
            return path
 
818
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
 
819
        path_filter_server.start_server()
 
820
        self.addCleanup(path_filter_server.stop_server)
 
821
        path_filter_transport = pathfilter.PathFilteringTransport(
 
822
            path_filter_server, '.')
 
823
        return (path_filter_server, path_filter_transport)
 
824
 
 
825
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
 
826
        """Check that each branch url ends with the given suffix."""
 
827
        for actual_bzrdir in actual_bzrdirs:
 
828
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
 
829
 
 
830
    def test_find_bzrdirs_permission_denied(self):
 
831
        foo, bar, baz = self.make_foo_bar_baz()
 
832
        transport = get_transport(self.get_url())
 
833
        path_filter_server, path_filter_transport = \
 
834
            self.make_fake_permission_denied_transport(transport, ['foo'])
 
835
        # local transport
 
836
        self.assertBranchUrlsEndWith('/baz/',
 
837
            bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
 
838
        # smart server
 
839
        smart_transport = self.make_smart_server('.',
 
840
            backing_server=path_filter_server)
 
841
        self.assertBranchUrlsEndWith('/baz/',
 
842
            bzrdir.BzrDir.find_bzrdirs(smart_transport))
 
843
 
 
844
    def test_find_bzrdirs_list_current(self):
 
845
        def list_current(transport):
 
846
            return [s for s in transport.list_dir('') if s != 'baz']
 
847
 
 
848
        foo, bar, baz = self.make_foo_bar_baz()
 
849
        transport = get_transport(self.get_url())
 
850
        self.assertEqualBzrdirs([foo, bar],
 
851
                                bzrdir.BzrDir.find_bzrdirs(transport,
 
852
                                    list_current=list_current))
 
853
 
 
854
    def test_find_bzrdirs_evaluate(self):
 
855
        def evaluate(bzrdir):
 
856
            try:
 
857
                repo = bzrdir.open_repository()
 
858
            except NoRepositoryPresent:
 
859
                return True, bzrdir.root_transport.base
 
860
            else:
 
861
                return False, bzrdir.root_transport.base
 
862
 
 
863
        foo, bar, baz = self.make_foo_bar_baz()
 
864
        transport = get_transport(self.get_url())
 
865
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
 
866
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
 
867
                                                         evaluate=evaluate)))
 
868
 
 
869
    def assertEqualBzrdirs(self, first, second):
 
870
        first = list(first)
 
871
        second = list(second)
 
872
        self.assertEqual(len(first), len(second))
 
873
        for x, y in zip(first, second):
 
874
            self.assertEqual(x.root_transport.base, y.root_transport.base)
 
875
 
 
876
    def test_find_branches(self):
 
877
        root = self.make_repository('', shared=True)
 
878
        foo, bar, baz = self.make_foo_bar_baz()
 
879
        qux = self.make_bzrdir('foo/qux')
 
880
        transport = get_transport(self.get_url())
 
881
        branches = bzrdir.BzrDir.find_branches(transport)
 
882
        self.assertEqual(baz.root_transport.base, branches[0].base)
 
883
        self.assertEqual(foo.root_transport.base, branches[1].base)
 
884
        self.assertEqual(bar.root_transport.base, branches[2].base)
 
885
 
 
886
        # ensure this works without a top-level repo
 
887
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
 
888
        self.assertEqual(foo.root_transport.base, branches[0].base)
 
889
        self.assertEqual(bar.root_transport.base, branches[1].base)
 
890
 
 
891
 
 
892
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
 
893
 
 
894
    def test_find_bzrdirs_missing_repo(self):
 
895
        transport = get_transport(self.get_url())
 
896
        arepo = self.make_repository('arepo', shared=True)
 
897
        abranch_url = arepo.user_url + '/abranch'
 
898
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
 
899
        transport.delete_tree('arepo/.bzr')
 
900
        self.assertRaises(errors.NoRepositoryPresent,
 
901
            branch.Branch.open, abranch_url)
 
902
        self.make_branch('baz')
 
903
        for actual_bzrdir in bzrdir.BzrDir.find_branches(transport):
 
904
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
 
905
 
 
906
 
 
907
class TestMeta1DirFormat(TestCaseWithTransport):
 
908
    """Tests specific to the meta1 dir format."""
 
909
 
 
910
    def test_right_base_dirs(self):
 
911
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
912
        t = dir.transport
 
913
        branch_base = t.clone('branch').base
 
914
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
915
        self.assertEqual(branch_base,
 
916
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
 
917
        repository_base = t.clone('repository').base
 
918
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
919
        self.assertEqual(repository_base,
 
920
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
921
        checkout_base = t.clone('checkout').base
 
922
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
 
923
        self.assertEqual(checkout_base,
 
924
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
 
925
 
 
926
    def test_meta1dir_uses_lockdir(self):
 
927
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
928
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
929
        t = dir.transport
 
930
        self.assertIsDirectory('branch-lock', t)
 
931
 
 
932
    def test_comparison(self):
 
933
        """Equality and inequality behave properly.
 
934
 
 
935
        Metadirs should compare equal iff they have the same repo, branch and
 
936
        tree formats.
 
937
        """
 
938
        mydir = bzrdir.format_registry.make_bzrdir('knit')
 
939
        self.assertEqual(mydir, mydir)
 
940
        self.assertFalse(mydir != mydir)
 
941
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
 
942
        self.assertEqual(otherdir, mydir)
 
943
        self.assertFalse(otherdir != mydir)
 
944
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
945
        self.assertNotEqual(otherdir2, mydir)
 
946
        self.assertFalse(otherdir2 == mydir)
 
947
 
 
948
    def test_needs_conversion_different_working_tree(self):
 
949
        # meta1dirs need an conversion if any element is not the default.
 
950
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
951
        tree = self.make_branch_and_tree('tree', format='knit')
 
952
        self.assertTrue(tree.bzrdir.needs_format_conversion(
 
953
            new_format))
 
954
 
 
955
    def test_initialize_on_format_uses_smart_transport(self):
 
956
        self.setup_smart_server_with_call_log()
 
957
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
958
        transport = self.get_transport('target')
 
959
        transport.ensure_base()
 
960
        self.reset_smart_call_log()
 
961
        instance = new_format.initialize_on_transport(transport)
 
962
        self.assertIsInstance(instance, remote.RemoteBzrDir)
 
963
        rpc_count = len(self.hpss_calls)
 
964
        # This figure represent the amount of work to perform this use case. It
 
965
        # is entirely ok to reduce this number if a test fails due to rpc_count
 
966
        # being too low. If rpc_count increases, more network roundtrips have
 
967
        # become necessary for this use case. Please do not adjust this number
 
968
        # upwards without agreement from bzr's network support maintainers.
 
969
        self.assertEqual(2, rpc_count)
 
970
 
 
971
 
 
972
class TestFormat5(TestCaseWithTransport):
 
973
    """Tests specific to the version 5 bzrdir format."""
 
974
 
 
975
    def test_same_lockfiles_between_tree_repo_branch(self):
 
976
        # this checks that only a single lockfiles instance is created
 
977
        # for format 5 objects
 
978
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
979
        def check_dir_components_use_same_lock(dir):
 
980
            ctrl_1 = dir.open_repository().control_files
 
981
            ctrl_2 = dir.open_branch().control_files
 
982
            ctrl_3 = dir.open_workingtree()._control_files
 
983
            self.assertTrue(ctrl_1 is ctrl_2)
 
984
            self.assertTrue(ctrl_2 is ctrl_3)
 
985
        check_dir_components_use_same_lock(dir)
 
986
        # and if we open it normally.
 
987
        dir = bzrdir.BzrDir.open(self.get_url())
 
988
        check_dir_components_use_same_lock(dir)
 
989
 
 
990
    def test_can_convert(self):
 
991
        # format 5 dirs are convertable
 
992
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
993
        self.assertTrue(dir.can_convert_format())
 
994
 
 
995
    def test_needs_conversion(self):
 
996
        # format 5 dirs need a conversion if they are not the default,
 
997
        # and they aren't
 
998
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
999
        # don't need to convert it to itself
 
1000
        self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
 
1001
        # do need to convert it to the current default
 
1002
        self.assertTrue(dir.needs_format_conversion(
 
1003
            bzrdir.BzrDirFormat.get_default_format()))
 
1004
 
 
1005
 
 
1006
class TestFormat6(TestCaseWithTransport):
 
1007
    """Tests specific to the version 6 bzrdir format."""
 
1008
 
 
1009
    def test_same_lockfiles_between_tree_repo_branch(self):
 
1010
        # this checks that only a single lockfiles instance is created
 
1011
        # for format 6 objects
 
1012
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
1013
        def check_dir_components_use_same_lock(dir):
 
1014
            ctrl_1 = dir.open_repository().control_files
 
1015
            ctrl_2 = dir.open_branch().control_files
 
1016
            ctrl_3 = dir.open_workingtree()._control_files
 
1017
            self.assertTrue(ctrl_1 is ctrl_2)
 
1018
            self.assertTrue(ctrl_2 is ctrl_3)
 
1019
        check_dir_components_use_same_lock(dir)
 
1020
        # and if we open it normally.
 
1021
        dir = bzrdir.BzrDir.open(self.get_url())
 
1022
        check_dir_components_use_same_lock(dir)
 
1023
 
 
1024
    def test_can_convert(self):
 
1025
        # format 6 dirs are convertable
 
1026
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
1027
        self.assertTrue(dir.can_convert_format())
 
1028
 
 
1029
    def test_needs_conversion(self):
 
1030
        # format 6 dirs need an conversion if they are not the default.
 
1031
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
1032
        self.assertTrue(dir.needs_format_conversion(
 
1033
            bzrdir.BzrDirFormat.get_default_format()))
 
1034
 
 
1035
 
 
1036
class NotBzrDir(bzrlib.bzrdir.BzrDir):
 
1037
    """A non .bzr based control directory."""
 
1038
 
 
1039
    def __init__(self, transport, format):
 
1040
        self._format = format
 
1041
        self.root_transport = transport
 
1042
        self.transport = transport.clone('.not')
 
1043
 
 
1044
 
 
1045
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
 
1046
    """A test class representing any non-.bzr based disk format."""
 
1047
 
 
1048
    def initialize_on_transport(self, transport):
 
1049
        """Initialize a new .not dir in the base directory of a Transport."""
 
1050
        transport.mkdir('.not')
 
1051
        return self.open(transport)
 
1052
 
 
1053
    def open(self, transport):
 
1054
        """Open this directory."""
 
1055
        return NotBzrDir(transport, self)
 
1056
 
 
1057
    @classmethod
 
1058
    def _known_formats(self):
 
1059
        return set([NotBzrDirFormat()])
 
1060
 
 
1061
    @classmethod
 
1062
    def probe_transport(self, transport):
 
1063
        """Our format is present if the transport ends in '.not/'."""
 
1064
        if transport.has('.not'):
 
1065
            return NotBzrDirFormat()
 
1066
 
 
1067
 
 
1068
class TestNotBzrDir(TestCaseWithTransport):
 
1069
    """Tests for using the bzrdir api with a non .bzr based disk format.
 
1070
 
 
1071
    If/when one of these is in the core, we can let the implementation tests
 
1072
    verify this works.
 
1073
    """
 
1074
 
 
1075
    def test_create_and_find_format(self):
 
1076
        # create a .notbzr dir
 
1077
        format = NotBzrDirFormat()
 
1078
        dir = format.initialize(self.get_url())
 
1079
        self.assertIsInstance(dir, NotBzrDir)
 
1080
        # now probe for it.
 
1081
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
1082
        try:
 
1083
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
 
1084
                get_transport(self.get_url()))
 
1085
            self.assertIsInstance(found, NotBzrDirFormat)
 
1086
        finally:
 
1087
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
1088
 
 
1089
    def test_included_in_known_formats(self):
 
1090
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
1091
        try:
 
1092
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
 
1093
            for format in formats:
 
1094
                if isinstance(format, NotBzrDirFormat):
 
1095
                    return
 
1096
            self.fail("No NotBzrDirFormat in %s" % formats)
 
1097
        finally:
 
1098
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
1099
 
 
1100
 
 
1101
class NonLocalTests(TestCaseWithTransport):
 
1102
    """Tests for bzrdir static behaviour on non local paths."""
 
1103
 
 
1104
    def setUp(self):
 
1105
        super(NonLocalTests, self).setUp()
 
1106
        self.vfs_transport_factory = memory.MemoryServer
 
1107
 
 
1108
    def test_create_branch_convenience(self):
 
1109
        # outside a repo the default convenience output is a repo+branch_tree
 
1110
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1111
        branch = bzrdir.BzrDir.create_branch_convenience(
 
1112
            self.get_url('foo'), format=format)
 
1113
        self.assertRaises(errors.NoWorkingTree,
 
1114
                          branch.bzrdir.open_workingtree)
 
1115
        branch.bzrdir.open_repository()
 
1116
 
 
1117
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
1118
        # outside a repo the default convenience output is a repo+branch_tree
 
1119
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1120
        self.assertRaises(errors.NotLocalUrl,
 
1121
            bzrdir.BzrDir.create_branch_convenience,
 
1122
            self.get_url('foo'),
 
1123
            force_new_tree=True,
 
1124
            format=format)
 
1125
        t = get_transport(self.get_url('.'))
 
1126
        self.assertFalse(t.has('foo'))
 
1127
 
 
1128
    def test_clone(self):
 
1129
        # clone into a nonlocal path works
 
1130
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1131
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
1132
                                                         format=format)
 
1133
        branch.bzrdir.open_workingtree()
 
1134
        result = branch.bzrdir.clone(self.get_url('remote'))
 
1135
        self.assertRaises(errors.NoWorkingTree,
 
1136
                          result.open_workingtree)
 
1137
        result.open_branch()
 
1138
        result.open_repository()
 
1139
 
 
1140
    def test_checkout_metadir(self):
 
1141
        # checkout_metadir has reasonable working tree format even when no
 
1142
        # working tree is present
 
1143
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
1144
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
1145
        checkout_format = my_bzrdir.checkout_metadir()
 
1146
        self.assertIsInstance(checkout_format.workingtree_format,
 
1147
                              workingtree.WorkingTreeFormat3)
 
1148
 
 
1149
 
 
1150
class TestHTTPRedirections(object):
 
1151
    """Test redirection between two http servers.
 
1152
 
 
1153
    This MUST be used by daughter classes that also inherit from
 
1154
    TestCaseWithTwoWebservers.
 
1155
 
 
1156
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
1157
    test framework will try to create an instance which cannot
 
1158
    run, its implementation being incomplete.
 
1159
    """
 
1160
 
 
1161
    def create_transport_readonly_server(self):
 
1162
        # We don't set the http protocol version, relying on the default
 
1163
        return http_utils.HTTPServerRedirecting()
 
1164
 
 
1165
    def create_transport_secondary_server(self):
 
1166
        # We don't set the http protocol version, relying on the default
 
1167
        return http_utils.HTTPServerRedirecting()
 
1168
 
 
1169
    def setUp(self):
 
1170
        super(TestHTTPRedirections, self).setUp()
 
1171
        # The redirections will point to the new server
 
1172
        self.new_server = self.get_readonly_server()
 
1173
        # The requests to the old server will be redirected
 
1174
        self.old_server = self.get_secondary_server()
 
1175
        # Configure the redirections
 
1176
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
1177
 
 
1178
    def test_loop(self):
 
1179
        # Both servers redirect to each other creating a loop
 
1180
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
1181
        # Starting from either server should loop
 
1182
        old_url = self._qualified_url(self.old_server.host,
 
1183
                                      self.old_server.port)
 
1184
        oldt = self._transport(old_url)
 
1185
        self.assertRaises(errors.NotBranchError,
 
1186
                          bzrdir.BzrDir.open_from_transport, oldt)
 
1187
        new_url = self._qualified_url(self.new_server.host,
 
1188
                                      self.new_server.port)
 
1189
        newt = self._transport(new_url)
 
1190
        self.assertRaises(errors.NotBranchError,
 
1191
                          bzrdir.BzrDir.open_from_transport, newt)
 
1192
 
 
1193
    def test_qualifier_preserved(self):
 
1194
        wt = self.make_branch_and_tree('branch')
 
1195
        old_url = self._qualified_url(self.old_server.host,
 
1196
                                      self.old_server.port)
 
1197
        start = self._transport(old_url).clone('branch')
 
1198
        bdir = bzrdir.BzrDir.open_from_transport(start)
 
1199
        # Redirection should preserve the qualifier, hence the transport class
 
1200
        # itself.
 
1201
        self.assertIsInstance(bdir.root_transport, type(start))
 
1202
 
 
1203
 
 
1204
class TestHTTPRedirections_urllib(TestHTTPRedirections,
 
1205
                                  http_utils.TestCaseWithTwoWebservers):
 
1206
    """Tests redirections for urllib implementation"""
 
1207
 
 
1208
    _transport = HttpTransport_urllib
 
1209
 
 
1210
    def _qualified_url(self, host, port):
 
1211
        result = 'http+urllib://%s:%s' % (host, port)
 
1212
        self.permit_url(result)
 
1213
        return result
 
1214
 
 
1215
 
 
1216
 
 
1217
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
 
1218
                                  TestHTTPRedirections,
 
1219
                                  http_utils.TestCaseWithTwoWebservers):
 
1220
    """Tests redirections for pycurl implementation"""
 
1221
 
 
1222
    def _qualified_url(self, host, port):
 
1223
        result = 'http+pycurl://%s:%s' % (host, port)
 
1224
        self.permit_url(result)
 
1225
        return result
 
1226
 
 
1227
 
 
1228
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
 
1229
                                  http_utils.TestCaseWithTwoWebservers):
 
1230
    """Tests redirections for the nosmart decorator"""
 
1231
 
 
1232
    _transport = NoSmartTransportDecorator
 
1233
 
 
1234
    def _qualified_url(self, host, port):
 
1235
        result = 'nosmart+http://%s:%s' % (host, port)
 
1236
        self.permit_url(result)
 
1237
        return result
 
1238
 
 
1239
 
 
1240
class TestHTTPRedirections_readonly(TestHTTPRedirections,
 
1241
                                    http_utils.TestCaseWithTwoWebservers):
 
1242
    """Tests redirections for readonly decoratror"""
 
1243
 
 
1244
    _transport = ReadonlyTransportDecorator
 
1245
 
 
1246
    def _qualified_url(self, host, port):
 
1247
        result = 'readonly+http://%s:%s' % (host, port)
 
1248
        self.permit_url(result)
 
1249
        return result
 
1250
 
 
1251
 
 
1252
class TestDotBzrHidden(TestCaseWithTransport):
 
1253
 
 
1254
    ls = ['ls']
 
1255
    if sys.platform == 'win32':
 
1256
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
 
1257
 
 
1258
    def get_ls(self):
 
1259
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
 
1260
            stderr=subprocess.PIPE)
 
1261
        out, err = f.communicate()
 
1262
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
 
1263
                         % (self.ls, err))
 
1264
        return out.splitlines()
 
1265
 
 
1266
    def test_dot_bzr_hidden(self):
 
1267
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1268
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1269
        b = bzrdir.BzrDir.create('.')
 
1270
        self.build_tree(['a'])
 
1271
        self.assertEquals(['a'], self.get_ls())
 
1272
 
 
1273
    def test_dot_bzr_hidden_with_url(self):
 
1274
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1275
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1276
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
 
1277
        self.build_tree(['a'])
 
1278
        self.assertEquals(['a'], self.get_ls())
 
1279
 
 
1280
 
 
1281
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
 
1282
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
 
1283
 
 
1284
    def _open(self, transport):
 
1285
        return _TestBzrDir(transport, self)
 
1286
 
 
1287
 
 
1288
class _TestBzrDir(bzrdir.BzrDirMeta1):
 
1289
    """Test BzrDir implementation for TestBzrDirSprout.
 
1290
 
 
1291
    When created a _TestBzrDir already has repository and a branch.  The branch
 
1292
    is a test double as well.
 
1293
    """
 
1294
 
 
1295
    def __init__(self, *args, **kwargs):
 
1296
        super(_TestBzrDir, self).__init__(*args, **kwargs)
 
1297
        self.test_branch = _TestBranch()
 
1298
        self.test_branch.repository = self.create_repository()
 
1299
 
 
1300
    def open_branch(self, unsupported=False):
 
1301
        return self.test_branch
 
1302
 
 
1303
    def cloning_metadir(self, require_stacking=False):
 
1304
        return _TestBzrDirFormat()
 
1305
 
 
1306
 
 
1307
class _TestBranchFormat(bzrlib.branch.BranchFormat):
 
1308
    """Test Branch format for TestBzrDirSprout."""
 
1309
 
 
1310
 
 
1311
class _TestBranch(bzrlib.branch.Branch):
 
1312
    """Test Branch implementation for TestBzrDirSprout."""
 
1313
 
 
1314
    def __init__(self, *args, **kwargs):
 
1315
        self._format = _TestBranchFormat()
 
1316
        super(_TestBranch, self).__init__(*args, **kwargs)
 
1317
        self.calls = []
 
1318
        self._parent = None
 
1319
 
 
1320
    def sprout(self, *args, **kwargs):
 
1321
        self.calls.append('sprout')
 
1322
        return _TestBranch()
 
1323
 
 
1324
    def copy_content_into(self, destination, revision_id=None):
 
1325
        self.calls.append('copy_content_into')
 
1326
 
 
1327
    def get_parent(self):
 
1328
        return self._parent
 
1329
 
 
1330
    def set_parent(self, parent):
 
1331
        self._parent = parent
 
1332
 
 
1333
 
 
1334
class TestBzrDirSprout(TestCaseWithMemoryTransport):
 
1335
 
 
1336
    def test_sprout_uses_branch_sprout(self):
 
1337
        """BzrDir.sprout calls Branch.sprout.
 
1338
 
 
1339
        Usually, BzrDir.sprout should delegate to the branch's sprout method
 
1340
        for part of the work.  This allows the source branch to control the
 
1341
        choice of format for the new branch.
 
1342
 
 
1343
        There are exceptions, but this tests avoids them:
 
1344
          - if there's no branch in the source bzrdir,
 
1345
          - or if the stacking has been requested and the format needs to be
 
1346
            overridden to satisfy that.
 
1347
        """
 
1348
        # Make an instrumented bzrdir.
 
1349
        t = self.get_transport('source')
 
1350
        t.ensure_base()
 
1351
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
 
1352
        # The instrumented bzrdir has a test_branch attribute that logs calls
 
1353
        # made to the branch contained in that bzrdir.  Initially the test
 
1354
        # branch exists but no calls have been made to it.
 
1355
        self.assertEqual([], source_bzrdir.test_branch.calls)
 
1356
 
 
1357
        # Sprout the bzrdir
 
1358
        target_url = self.get_url('target')
 
1359
        result = source_bzrdir.sprout(target_url, recurse='no')
 
1360
 
 
1361
        # The bzrdir called the branch's sprout method.
 
1362
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
 
1363
 
 
1364
    def test_sprout_parent(self):
 
1365
        grandparent_tree = self.make_branch('grandparent')
 
1366
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
 
1367
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
 
1368
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
 
1369
 
 
1370
 
 
1371
class TestBzrDirHooks(TestCaseWithMemoryTransport):
 
1372
 
 
1373
    def test_pre_open_called(self):
 
1374
        calls = []
 
1375
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
 
1376
        transport = self.get_transport('foo')
 
1377
        url = transport.base
 
1378
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
 
1379
        self.assertEqual([transport.base], [t.base for t in calls])
 
1380
 
 
1381
    def test_pre_open_actual_exceptions_raised(self):
 
1382
        count = [0]
 
1383
        def fail_once(transport):
 
1384
            count[0] += 1
 
1385
            if count[0] == 1:
 
1386
                raise errors.BzrError("fail")
 
1387
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
 
1388
        transport = self.get_transport('foo')
 
1389
        url = transport.base
 
1390
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
 
1391
        self.assertEqual('fail', err._preformatted_string)
 
1392
 
 
1393
    def test_post_repo_init(self):
 
1394
        from bzrlib.bzrdir import RepoInitHookParams
 
1395
        calls = []
 
1396
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1397
            calls.append, None)
 
1398
        self.make_repository('foo')
 
1399
        self.assertLength(1, calls)
 
1400
        params = calls[0]
 
1401
        self.assertIsInstance(params, RepoInitHookParams)
 
1402
        self.assertTrue(hasattr(params, 'bzrdir'))
 
1403
        self.assertTrue(hasattr(params, 'repository'))