~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: John Arbash Meinel
  • Date: 2011-05-11 11:35:28 UTC
  • mto: This revision was merged to the branch mainline in revision 5851.
  • Revision ID: john@arbash-meinel.com-20110511113528-qepibuwxicjrbb2h
Break compatibility with python <2.6.

This includes auditing the code for places where we were doing
explicit 'sys.version' checks and removing them as appropriate.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/per_bzr_dir.
 
20
"""
 
21
 
 
22
import os
 
23
import subprocess
 
24
import sys
 
25
 
 
26
from bzrlib import (
 
27
    branch,
 
28
    bzrdir,
 
29
    controldir,
 
30
    errors,
 
31
    help_topics,
 
32
    lock,
 
33
    repository,
 
34
    revision as _mod_revision,
 
35
    osutils,
 
36
    remote,
 
37
    symbol_versioning,
 
38
    transport as _mod_transport,
 
39
    urlutils,
 
40
    win32utils,
 
41
    workingtree_3,
 
42
    workingtree_4,
 
43
    )
 
44
import bzrlib.branch
 
45
from bzrlib.errors import (
 
46
    NotBranchError,
 
47
    NoColocatedBranchSupport,
 
48
    UnknownFormatError,
 
49
    UnsupportedFormatError,
 
50
    )
 
51
from bzrlib.tests import (
 
52
    TestCase,
 
53
    TestCaseWithMemoryTransport,
 
54
    TestCaseWithTransport,
 
55
    TestSkipped,
 
56
    )
 
57
from bzrlib.tests import(
 
58
    http_server,
 
59
    http_utils,
 
60
    )
 
61
from bzrlib.tests.test_http import TestWithTransport_pycurl
 
62
from bzrlib.transport import (
 
63
    memory,
 
64
    pathfilter,
 
65
    )
 
66
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
67
from bzrlib.transport.nosmart import NoSmartTransportDecorator
 
68
from bzrlib.transport.readonly import ReadonlyTransportDecorator
 
69
from bzrlib.repofmt import knitrepo, knitpack_repo
 
70
 
 
71
 
 
72
class TestDefaultFormat(TestCase):
 
73
 
 
74
    def test_get_set_default_format(self):
 
75
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
76
        # default is BzrDirMetaFormat1
 
77
        self.assertIsInstance(old_format, bzrdir.BzrDirMetaFormat1)
 
78
        controldir.ControlDirFormat._set_default_format(SampleBzrDirFormat())
 
79
        # creating a bzr dir should now create an instrumented dir.
 
80
        try:
 
81
            result = bzrdir.BzrDir.create('memory:///')
 
82
            self.assertIsInstance(result, SampleBzrDir)
 
83
        finally:
 
84
            controldir.ControlDirFormat._set_default_format(old_format)
 
85
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
86
 
 
87
 
 
88
class DeprecatedBzrDirFormat(bzrdir.BzrDirFormat):
 
89
    """A deprecated bzr dir format."""
 
90
 
 
91
 
 
92
class TestFormatRegistry(TestCase):
 
93
 
 
94
    def make_format_registry(self):
 
95
        my_format_registry = controldir.ControlDirFormatRegistry()
 
96
        my_format_registry.register('deprecated', DeprecatedBzrDirFormat,
 
97
            'Some format.  Slower and unawesome and deprecated.',
 
98
            deprecated=True)
 
99
        my_format_registry.register_lazy('lazy', 'bzrlib.tests.test_bzrdir',
 
100
            'DeprecatedBzrDirFormat', 'Format registered lazily',
 
101
            deprecated=True)
 
102
        bzrdir.register_metadir(my_format_registry, 'knit',
 
103
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
 
104
            'Format using knits',
 
105
            )
 
106
        my_format_registry.set_default('knit')
 
107
        bzrdir.register_metadir(my_format_registry,
 
108
            'branch6',
 
109
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
110
            'Experimental successor to knit.  Use at your own risk.',
 
111
            branch_format='bzrlib.branch.BzrBranchFormat6',
 
112
            experimental=True)
 
113
        bzrdir.register_metadir(my_format_registry,
 
114
            'hidden format',
 
115
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
116
            'Experimental successor to knit.  Use at your own risk.',
 
117
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
 
118
        my_format_registry.register('hiddendeprecated', DeprecatedBzrDirFormat,
 
119
            'Old format.  Slower and does not support things. ', hidden=True)
 
120
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.tests.test_bzrdir',
 
121
            'DeprecatedBzrDirFormat', 'Format registered lazily',
 
122
            deprecated=True, hidden=True)
 
123
        return my_format_registry
 
124
 
 
125
    def test_format_registry(self):
 
126
        my_format_registry = self.make_format_registry()
 
127
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
 
128
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
 
129
        my_bzrdir = my_format_registry.make_bzrdir('deprecated')
 
130
        self.assertIsInstance(my_bzrdir, DeprecatedBzrDirFormat)
 
131
        my_bzrdir = my_format_registry.make_bzrdir('default')
 
132
        self.assertIsInstance(my_bzrdir.repository_format,
 
133
            knitrepo.RepositoryFormatKnit1)
 
134
        my_bzrdir = my_format_registry.make_bzrdir('knit')
 
135
        self.assertIsInstance(my_bzrdir.repository_format,
 
136
            knitrepo.RepositoryFormatKnit1)
 
137
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
 
138
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
139
                              bzrlib.branch.BzrBranchFormat6)
 
140
 
 
141
    def test_get_help(self):
 
142
        my_format_registry = self.make_format_registry()
 
143
        self.assertEqual('Format registered lazily',
 
144
                         my_format_registry.get_help('lazy'))
 
145
        self.assertEqual('Format using knits',
 
146
                         my_format_registry.get_help('knit'))
 
147
        self.assertEqual('Format using knits',
 
148
                         my_format_registry.get_help('default'))
 
149
        self.assertEqual('Some format.  Slower and unawesome and deprecated.',
 
150
                         my_format_registry.get_help('deprecated'))
 
151
 
 
152
    def test_help_topic(self):
 
153
        topics = help_topics.HelpTopicRegistry()
 
154
        registry = self.make_format_registry()
 
155
        topics.register('current-formats', registry.help_topic,
 
156
                        'Current formats')
 
157
        topics.register('other-formats', registry.help_topic,
 
158
                        'Other formats')
 
159
        new = topics.get_detail('current-formats')
 
160
        rest = topics.get_detail('other-formats')
 
161
        experimental, deprecated = rest.split('Deprecated formats')
 
162
        self.assertContainsRe(new, 'formats-help')
 
163
        self.assertContainsRe(new,
 
164
                ':knit:\n    \(native\) \(default\) Format using knits\n')
 
165
        self.assertContainsRe(experimental,
 
166
                ':branch6:\n    \(native\) Experimental successor to knit')
 
167
        self.assertContainsRe(deprecated,
 
168
                ':lazy:\n    \(native\) Format registered lazily\n')
 
169
        self.assertNotContainsRe(new, 'hidden')
 
170
 
 
171
    def test_set_default_repository(self):
 
172
        default_factory = bzrdir.format_registry.get('default')
 
173
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
 
174
                       if v == default_factory and k != 'default'][0]
 
175
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
 
176
        try:
 
177
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
 
178
                          bzrdir.format_registry.get('default'))
 
179
            self.assertIs(
 
180
                repository.format_registry.get_default().__class__,
 
181
                knitrepo.RepositoryFormatKnit3)
 
182
        finally:
 
183
            bzrdir.format_registry.set_default_repository(old_default)
 
184
 
 
185
    def test_aliases(self):
 
186
        a_registry = controldir.ControlDirFormatRegistry()
 
187
        a_registry.register('deprecated', DeprecatedBzrDirFormat,
 
188
            'Old format.  Slower and does not support stuff',
 
189
            deprecated=True)
 
190
        a_registry.register('deprecatedalias', DeprecatedBzrDirFormat,
 
191
            'Old format.  Slower and does not support stuff',
 
192
            deprecated=True, alias=True)
 
193
        self.assertEqual(frozenset(['deprecatedalias']), a_registry.aliases())
 
194
 
 
195
 
 
196
class SampleBranch(bzrlib.branch.Branch):
 
197
    """A dummy branch for guess what, dummy use."""
 
198
 
 
199
    def __init__(self, dir):
 
200
        self.bzrdir = dir
 
201
 
 
202
 
 
203
class SampleRepository(bzrlib.repository.Repository):
 
204
    """A dummy repo."""
 
205
 
 
206
    def __init__(self, dir):
 
207
        self.bzrdir = dir
 
208
 
 
209
 
 
210
class SampleBzrDir(bzrdir.BzrDir):
 
211
    """A sample BzrDir implementation to allow testing static methods."""
 
212
 
 
213
    def create_repository(self, shared=False):
 
214
        """See BzrDir.create_repository."""
 
215
        return "A repository"
 
216
 
 
217
    def open_repository(self):
 
218
        """See BzrDir.open_repository."""
 
219
        return SampleRepository(self)
 
220
 
 
221
    def create_branch(self, name=None):
 
222
        """See BzrDir.create_branch."""
 
223
        if name is not None:
 
224
            raise NoColocatedBranchSupport(self)
 
225
        return SampleBranch(self)
 
226
 
 
227
    def create_workingtree(self):
 
228
        """See BzrDir.create_workingtree."""
 
229
        return "A tree"
 
230
 
 
231
 
 
232
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
233
    """A sample format
 
234
 
 
235
    this format is initializable, unsupported to aid in testing the
 
236
    open and open_downlevel routines.
 
237
    """
 
238
 
 
239
    def get_format_string(self):
 
240
        """See BzrDirFormat.get_format_string()."""
 
241
        return "Sample .bzr dir format."
 
242
 
 
243
    def initialize_on_transport(self, t):
 
244
        """Create a bzr dir."""
 
245
        t.mkdir('.bzr')
 
246
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
247
        return SampleBzrDir(t, self)
 
248
 
 
249
    def is_supported(self):
 
250
        return False
 
251
 
 
252
    def open(self, transport, _found=None):
 
253
        return "opened branch."
 
254
 
 
255
 
 
256
class BzrDirFormatTest1(bzrdir.BzrDirMetaFormat1):
 
257
 
 
258
    @staticmethod
 
259
    def get_format_string():
 
260
        return "Test format 1"
 
261
 
 
262
 
 
263
class BzrDirFormatTest2(bzrdir.BzrDirMetaFormat1):
 
264
 
 
265
    @staticmethod
 
266
    def get_format_string():
 
267
        return "Test format 2"
 
268
 
 
269
 
 
270
class TestBzrDirFormat(TestCaseWithTransport):
 
271
    """Tests for the BzrDirFormat facility."""
 
272
 
 
273
    def test_find_format(self):
 
274
        # is the right format object found for a branch?
 
275
        # create a branch with a few known format objects.
 
276
        bzrdir.BzrProber.formats.register(BzrDirFormatTest1.get_format_string(),
 
277
            BzrDirFormatTest1())
 
278
        self.addCleanup(bzrdir.BzrProber.formats.remove,
 
279
            BzrDirFormatTest1.get_format_string())
 
280
        bzrdir.BzrProber.formats.register(BzrDirFormatTest2.get_format_string(),
 
281
            BzrDirFormatTest2())
 
282
        self.addCleanup(bzrdir.BzrProber.formats.remove,
 
283
            BzrDirFormatTest2.get_format_string())
 
284
        t = self.get_transport()
 
285
        self.build_tree(["foo/", "bar/"], transport=t)
 
286
        def check_format(format, url):
 
287
            format.initialize(url)
 
288
            t = _mod_transport.get_transport(url)
 
289
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
290
            self.assertIsInstance(found_format, format.__class__)
 
291
        check_format(BzrDirFormatTest1(), "foo")
 
292
        check_format(BzrDirFormatTest2(), "bar")
 
293
 
 
294
    def test_find_format_nothing_there(self):
 
295
        self.assertRaises(NotBranchError,
 
296
                          bzrdir.BzrDirFormat.find_format,
 
297
                          _mod_transport.get_transport('.'))
 
298
 
 
299
    def test_find_format_unknown_format(self):
 
300
        t = self.get_transport()
 
301
        t.mkdir('.bzr')
 
302
        t.put_bytes('.bzr/branch-format', '')
 
303
        self.assertRaises(UnknownFormatError,
 
304
                          bzrdir.BzrDirFormat.find_format,
 
305
                          _mod_transport.get_transport('.'))
 
306
 
 
307
    def test_register_unregister_format(self):
 
308
        format = SampleBzrDirFormat()
 
309
        url = self.get_url()
 
310
        # make a bzrdir
 
311
        format.initialize(url)
 
312
        # register a format for it.
 
313
        bzrdir.BzrProber.formats.register(format.get_format_string(), format)
 
314
        # which bzrdir.Open will refuse (not supported)
 
315
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
316
        # which bzrdir.open_containing will refuse (not supported)
 
317
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
 
318
        # but open_downlevel will work
 
319
        t = _mod_transport.get_transport(url)
 
320
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
321
        # unregister the format
 
322
        bzrdir.BzrProber.formats.remove(format.get_format_string())
 
323
        # now open_downlevel should fail too.
 
324
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
 
325
 
 
326
    def test_create_branch_and_repo_uses_default(self):
 
327
        format = SampleBzrDirFormat()
 
328
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
329
                                                      format=format)
 
330
        self.assertTrue(isinstance(branch, SampleBranch))
 
331
 
 
332
    def test_create_branch_and_repo_under_shared(self):
 
333
        # creating a branch and repo in a shared repo uses the
 
334
        # shared repository
 
335
        format = bzrdir.format_registry.make_bzrdir('knit')
 
336
        self.make_repository('.', shared=True, format=format)
 
337
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
338
            self.get_url('child'), format=format)
 
339
        self.assertRaises(errors.NoRepositoryPresent,
 
340
                          branch.bzrdir.open_repository)
 
341
 
 
342
    def test_create_branch_and_repo_under_shared_force_new(self):
 
343
        # creating a branch and repo in a shared repo can be forced to
 
344
        # make a new repo
 
345
        format = bzrdir.format_registry.make_bzrdir('knit')
 
346
        self.make_repository('.', shared=True, format=format)
 
347
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
348
                                                      force_new_repo=True,
 
349
                                                      format=format)
 
350
        branch.bzrdir.open_repository()
 
351
 
 
352
    def test_create_standalone_working_tree(self):
 
353
        format = SampleBzrDirFormat()
 
354
        # note this is deliberately readonly, as this failure should
 
355
        # occur before any writes.
 
356
        self.assertRaises(errors.NotLocalUrl,
 
357
                          bzrdir.BzrDir.create_standalone_workingtree,
 
358
                          self.get_readonly_url(), format=format)
 
359
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
 
360
                                                           format=format)
 
361
        self.assertEqual('A tree', tree)
 
362
 
 
363
    def test_create_standalone_working_tree_under_shared_repo(self):
 
364
        # create standalone working tree always makes a repo.
 
365
        format = bzrdir.format_registry.make_bzrdir('knit')
 
366
        self.make_repository('.', shared=True, format=format)
 
367
        # note this is deliberately readonly, as this failure should
 
368
        # occur before any writes.
 
369
        self.assertRaises(errors.NotLocalUrl,
 
370
                          bzrdir.BzrDir.create_standalone_workingtree,
 
371
                          self.get_readonly_url('child'), format=format)
 
372
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
 
373
            format=format)
 
374
        tree.bzrdir.open_repository()
 
375
 
 
376
    def test_create_branch_convenience(self):
 
377
        # outside a repo the default convenience output is a repo+branch_tree
 
378
        format = bzrdir.format_registry.make_bzrdir('knit')
 
379
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
380
        branch.bzrdir.open_workingtree()
 
381
        branch.bzrdir.open_repository()
 
382
 
 
383
    def test_create_branch_convenience_possible_transports(self):
 
384
        """Check that the optional 'possible_transports' is recognized"""
 
385
        format = bzrdir.format_registry.make_bzrdir('knit')
 
386
        t = self.get_transport()
 
387
        branch = bzrdir.BzrDir.create_branch_convenience(
 
388
            '.', format=format, possible_transports=[t])
 
389
        branch.bzrdir.open_workingtree()
 
390
        branch.bzrdir.open_repository()
 
391
 
 
392
    def test_create_branch_convenience_root(self):
 
393
        """Creating a branch at the root of a fs should work."""
 
394
        self.vfs_transport_factory = memory.MemoryServer
 
395
        # outside a repo the default convenience output is a repo+branch_tree
 
396
        format = bzrdir.format_registry.make_bzrdir('knit')
 
397
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
 
398
                                                         format=format)
 
399
        self.assertRaises(errors.NoWorkingTree,
 
400
                          branch.bzrdir.open_workingtree)
 
401
        branch.bzrdir.open_repository()
 
402
 
 
403
    def test_create_branch_convenience_under_shared_repo(self):
 
404
        # inside a repo the default convenience output is a branch+ follow the
 
405
        # repo tree policy
 
406
        format = bzrdir.format_registry.make_bzrdir('knit')
 
407
        self.make_repository('.', shared=True, format=format)
 
408
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
409
            format=format)
 
410
        branch.bzrdir.open_workingtree()
 
411
        self.assertRaises(errors.NoRepositoryPresent,
 
412
                          branch.bzrdir.open_repository)
 
413
 
 
414
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
415
        # inside a repo the default convenience output is a branch+ follow the
 
416
        # repo tree policy but we can override that
 
417
        format = bzrdir.format_registry.make_bzrdir('knit')
 
418
        self.make_repository('.', shared=True, format=format)
 
419
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
420
            force_new_tree=False, format=format)
 
421
        self.assertRaises(errors.NoWorkingTree,
 
422
                          branch.bzrdir.open_workingtree)
 
423
        self.assertRaises(errors.NoRepositoryPresent,
 
424
                          branch.bzrdir.open_repository)
 
425
 
 
426
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
427
        # inside a repo the default convenience output is a branch+ follow the
 
428
        # repo tree policy
 
429
        format = bzrdir.format_registry.make_bzrdir('knit')
 
430
        repo = self.make_repository('.', shared=True, format=format)
 
431
        repo.set_make_working_trees(False)
 
432
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
433
                                                         format=format)
 
434
        self.assertRaises(errors.NoWorkingTree,
 
435
                          branch.bzrdir.open_workingtree)
 
436
        self.assertRaises(errors.NoRepositoryPresent,
 
437
                          branch.bzrdir.open_repository)
 
438
 
 
439
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
440
        # inside a repo the default convenience output is a branch+ follow the
 
441
        # repo tree policy but we can override that
 
442
        format = bzrdir.format_registry.make_bzrdir('knit')
 
443
        repo = self.make_repository('.', shared=True, format=format)
 
444
        repo.set_make_working_trees(False)
 
445
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
446
            force_new_tree=True, format=format)
 
447
        branch.bzrdir.open_workingtree()
 
448
        self.assertRaises(errors.NoRepositoryPresent,
 
449
                          branch.bzrdir.open_repository)
 
450
 
 
451
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
452
        # inside a repo the default convenience output is overridable to give
 
453
        # repo+branch+tree
 
454
        format = bzrdir.format_registry.make_bzrdir('knit')
 
455
        self.make_repository('.', shared=True, format=format)
 
456
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
457
            force_new_repo=True, format=format)
 
458
        branch.bzrdir.open_repository()
 
459
        branch.bzrdir.open_workingtree()
 
460
 
 
461
 
 
462
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
 
463
 
 
464
    def test_acquire_repository_standalone(self):
 
465
        """The default acquisition policy should create a standalone branch."""
 
466
        my_bzrdir = self.make_bzrdir('.')
 
467
        repo_policy = my_bzrdir.determine_repository_policy()
 
468
        repo, is_new = repo_policy.acquire_repository()
 
469
        self.assertEqual(repo.bzrdir.root_transport.base,
 
470
                         my_bzrdir.root_transport.base)
 
471
        self.assertFalse(repo.is_shared())
 
472
 
 
473
    def test_determine_stacking_policy(self):
 
474
        parent_bzrdir = self.make_bzrdir('.')
 
475
        child_bzrdir = self.make_bzrdir('child')
 
476
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
 
477
        repo_policy = child_bzrdir.determine_repository_policy()
 
478
        self.assertEqual('http://example.org', repo_policy._stack_on)
 
479
 
 
480
    def test_determine_stacking_policy_relative(self):
 
481
        parent_bzrdir = self.make_bzrdir('.')
 
482
        child_bzrdir = self.make_bzrdir('child')
 
483
        parent_bzrdir.get_config().set_default_stack_on('child2')
 
484
        repo_policy = child_bzrdir.determine_repository_policy()
 
485
        self.assertEqual('child2', repo_policy._stack_on)
 
486
        self.assertEqual(parent_bzrdir.root_transport.base,
 
487
                         repo_policy._stack_on_pwd)
 
488
 
 
489
    def prepare_default_stacking(self, child_format='1.6'):
 
490
        parent_bzrdir = self.make_bzrdir('.')
 
491
        child_branch = self.make_branch('child', format=child_format)
 
492
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
 
493
        new_child_transport = parent_bzrdir.transport.clone('child2')
 
494
        return child_branch, new_child_transport
 
495
 
 
496
    def test_clone_on_transport_obeys_stacking_policy(self):
 
497
        child_branch, new_child_transport = self.prepare_default_stacking()
 
498
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
499
        self.assertEqual(child_branch.base,
 
500
                         new_child.open_branch().get_stacked_on_url())
 
501
 
 
502
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
 
503
        # Make stackable source branch with an unstackable repo format.
 
504
        source_bzrdir = self.make_bzrdir('source')
 
505
        knitpack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
 
506
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(
 
507
            source_bzrdir)
 
508
        # Make a directory with a default stacking policy
 
509
        parent_bzrdir = self.make_bzrdir('parent')
 
510
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
 
511
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
 
512
        # Clone source into directory
 
513
        target = source_bzrdir.clone(self.get_url('parent/target'))
 
514
 
 
515
    def test_sprout_obeys_stacking_policy(self):
 
516
        child_branch, new_child_transport = self.prepare_default_stacking()
 
517
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
518
        self.assertEqual(child_branch.base,
 
519
                         new_child.open_branch().get_stacked_on_url())
 
520
 
 
521
    def test_clone_ignores_policy_for_unsupported_formats(self):
 
522
        child_branch, new_child_transport = self.prepare_default_stacking(
 
523
            child_format='pack-0.92')
 
524
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
 
525
        self.assertRaises(errors.UnstackableBranchFormat,
 
526
                          new_child.open_branch().get_stacked_on_url)
 
527
 
 
528
    def test_sprout_ignores_policy_for_unsupported_formats(self):
 
529
        child_branch, new_child_transport = self.prepare_default_stacking(
 
530
            child_format='pack-0.92')
 
531
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
 
532
        self.assertRaises(errors.UnstackableBranchFormat,
 
533
                          new_child.open_branch().get_stacked_on_url)
 
534
 
 
535
    def test_sprout_upgrades_format_if_stacked_specified(self):
 
536
        child_branch, new_child_transport = self.prepare_default_stacking(
 
537
            child_format='pack-0.92')
 
538
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
539
                                               stacked=True)
 
540
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
541
                         new_child.open_branch().get_stacked_on_url())
 
542
        repo = new_child.open_repository()
 
543
        self.assertTrue(repo._format.supports_external_lookups)
 
544
        self.assertFalse(repo.supports_rich_root())
 
545
 
 
546
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
 
547
        child_branch, new_child_transport = self.prepare_default_stacking(
 
548
            child_format='pack-0.92')
 
549
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
 
550
            stacked_on=child_branch.bzrdir.root_transport.base)
 
551
        self.assertEqual(child_branch.bzrdir.root_transport.base,
 
552
                         new_child.open_branch().get_stacked_on_url())
 
553
        repo = new_child.open_repository()
 
554
        self.assertTrue(repo._format.supports_external_lookups)
 
555
        self.assertFalse(repo.supports_rich_root())
 
556
 
 
557
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
 
558
        child_branch, new_child_transport = self.prepare_default_stacking(
 
559
            child_format='rich-root-pack')
 
560
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
 
561
                                               stacked=True)
 
562
        repo = new_child.open_repository()
 
563
        self.assertTrue(repo._format.supports_external_lookups)
 
564
        self.assertTrue(repo.supports_rich_root())
 
565
 
 
566
    def test_add_fallback_repo_handles_absolute_urls(self):
 
567
        stack_on = self.make_branch('stack_on', format='1.6')
 
568
        repo = self.make_repository('repo', format='1.6')
 
569
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
 
570
        policy._add_fallback(repo)
 
571
 
 
572
    def test_add_fallback_repo_handles_relative_urls(self):
 
573
        stack_on = self.make_branch('stack_on', format='1.6')
 
574
        repo = self.make_repository('repo', format='1.6')
 
575
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
 
576
        policy._add_fallback(repo)
 
577
 
 
578
    def test_configure_relative_branch_stacking_url(self):
 
579
        stack_on = self.make_branch('stack_on', format='1.6')
 
580
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
581
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
582
            '.', stack_on.base)
 
583
        policy.configure_branch(stacked)
 
584
        self.assertEqual('..', stacked.get_stacked_on_url())
 
585
 
 
586
    def test_relative_branch_stacking_to_absolute(self):
 
587
        stack_on = self.make_branch('stack_on', format='1.6')
 
588
        stacked = self.make_branch('stack_on/stacked', format='1.6')
 
589
        policy = bzrdir.UseExistingRepository(stacked.repository,
 
590
            '.', self.get_readonly_url('stack_on'))
 
591
        policy.configure_branch(stacked)
 
592
        self.assertEqual(self.get_readonly_url('stack_on'),
 
593
                         stacked.get_stacked_on_url())
 
594
 
 
595
 
 
596
class ChrootedTests(TestCaseWithTransport):
 
597
    """A support class that provides readonly urls outside the local namespace.
 
598
 
 
599
    This is done by checking if self.transport_server is a MemoryServer. if it
 
600
    is then we are chrooted already, if it is not then an HttpServer is used
 
601
    for readonly urls.
 
602
    """
 
603
 
 
604
    def setUp(self):
 
605
        super(ChrootedTests, self).setUp()
 
606
        if not self.vfs_transport_factory == memory.MemoryServer:
 
607
            self.transport_readonly_server = http_server.HttpServer
 
608
 
 
609
    def local_branch_path(self, branch):
 
610
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
611
 
 
612
    def test_open_containing(self):
 
613
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
614
                          self.get_readonly_url(''))
 
615
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
616
                          self.get_readonly_url('g/p/q'))
 
617
        control = bzrdir.BzrDir.create(self.get_url())
 
618
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
 
619
        self.assertEqual('', relpath)
 
620
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
 
621
        self.assertEqual('g/p/q', relpath)
 
622
 
 
623
    def test_open_containing_tree_branch_or_repository_empty(self):
 
624
        self.assertRaises(errors.NotBranchError,
 
625
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
 
626
            self.get_readonly_url(''))
 
627
 
 
628
    def test_open_containing_tree_branch_or_repository_all(self):
 
629
        self.make_branch_and_tree('topdir')
 
630
        tree, branch, repo, relpath = \
 
631
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
632
                'topdir/foo')
 
633
        self.assertEqual(os.path.realpath('topdir'),
 
634
                         os.path.realpath(tree.basedir))
 
635
        self.assertEqual(os.path.realpath('topdir'),
 
636
                         self.local_branch_path(branch))
 
637
        self.assertEqual(
 
638
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
 
639
            repo.bzrdir.transport.local_abspath('repository'))
 
640
        self.assertEqual(relpath, 'foo')
 
641
 
 
642
    def test_open_containing_tree_branch_or_repository_no_tree(self):
 
643
        self.make_branch('branch')
 
644
        tree, branch, repo, relpath = \
 
645
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
646
                'branch/foo')
 
647
        self.assertEqual(tree, None)
 
648
        self.assertEqual(os.path.realpath('branch'),
 
649
                         self.local_branch_path(branch))
 
650
        self.assertEqual(
 
651
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
 
652
            repo.bzrdir.transport.local_abspath('repository'))
 
653
        self.assertEqual(relpath, 'foo')
 
654
 
 
655
    def test_open_containing_tree_branch_or_repository_repo(self):
 
656
        self.make_repository('repo')
 
657
        tree, branch, repo, relpath = \
 
658
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
659
                'repo')
 
660
        self.assertEqual(tree, None)
 
661
        self.assertEqual(branch, None)
 
662
        self.assertEqual(
 
663
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
 
664
            repo.bzrdir.transport.local_abspath('repository'))
 
665
        self.assertEqual(relpath, '')
 
666
 
 
667
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
 
668
        self.make_repository('shared', shared=True)
 
669
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
 
670
                                                force_new_tree=False)
 
671
        tree, branch, repo, relpath = \
 
672
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
673
                'shared/branch')
 
674
        self.assertEqual(tree, None)
 
675
        self.assertEqual(os.path.realpath('shared/branch'),
 
676
                         self.local_branch_path(branch))
 
677
        self.assertEqual(
 
678
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
 
679
            repo.bzrdir.transport.local_abspath('repository'))
 
680
        self.assertEqual(relpath, '')
 
681
 
 
682
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
 
683
        self.make_branch_and_tree('foo')
 
684
        self.build_tree(['foo/bar/'])
 
685
        tree, branch, repo, relpath = \
 
686
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
687
                'foo/bar')
 
688
        self.assertEqual(os.path.realpath('foo'),
 
689
                         os.path.realpath(tree.basedir))
 
690
        self.assertEqual(os.path.realpath('foo'),
 
691
                         self.local_branch_path(branch))
 
692
        self.assertEqual(
 
693
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
 
694
            repo.bzrdir.transport.local_abspath('repository'))
 
695
        self.assertEqual(relpath, 'bar')
 
696
 
 
697
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
 
698
        self.make_repository('bar')
 
699
        self.build_tree(['bar/baz/'])
 
700
        tree, branch, repo, relpath = \
 
701
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
 
702
                'bar/baz')
 
703
        self.assertEqual(tree, None)
 
704
        self.assertEqual(branch, None)
 
705
        self.assertEqual(
 
706
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
 
707
            repo.bzrdir.transport.local_abspath('repository'))
 
708
        self.assertEqual(relpath, 'baz')
 
709
 
 
710
    def test_open_containing_from_transport(self):
 
711
        self.assertRaises(NotBranchError,
 
712
            bzrdir.BzrDir.open_containing_from_transport,
 
713
            _mod_transport.get_transport(self.get_readonly_url('')))
 
714
        self.assertRaises(NotBranchError,
 
715
            bzrdir.BzrDir.open_containing_from_transport,
 
716
            _mod_transport.get_transport(self.get_readonly_url('g/p/q')))
 
717
        control = bzrdir.BzrDir.create(self.get_url())
 
718
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
719
            _mod_transport.get_transport(self.get_readonly_url('')))
 
720
        self.assertEqual('', relpath)
 
721
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
722
            _mod_transport.get_transport(self.get_readonly_url('g/p/q')))
 
723
        self.assertEqual('g/p/q', relpath)
 
724
 
 
725
    def test_open_containing_tree_or_branch(self):
 
726
        self.make_branch_and_tree('topdir')
 
727
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
728
            'topdir/foo')
 
729
        self.assertEqual(os.path.realpath('topdir'),
 
730
                         os.path.realpath(tree.basedir))
 
731
        self.assertEqual(os.path.realpath('topdir'),
 
732
                         self.local_branch_path(branch))
 
733
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
734
        self.assertEqual('foo', relpath)
 
735
        # opening from non-local should not return the tree
 
736
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
737
            self.get_readonly_url('topdir/foo'))
 
738
        self.assertEqual(None, tree)
 
739
        self.assertEqual('foo', relpath)
 
740
        # without a tree:
 
741
        self.make_branch('topdir/foo')
 
742
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
743
            'topdir/foo')
 
744
        self.assertIs(tree, None)
 
745
        self.assertEqual(os.path.realpath('topdir/foo'),
 
746
                         self.local_branch_path(branch))
 
747
        self.assertEqual('', relpath)
 
748
 
 
749
    def test_open_tree_or_branch(self):
 
750
        self.make_branch_and_tree('topdir')
 
751
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
 
752
        self.assertEqual(os.path.realpath('topdir'),
 
753
                         os.path.realpath(tree.basedir))
 
754
        self.assertEqual(os.path.realpath('topdir'),
 
755
                         self.local_branch_path(branch))
 
756
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
757
        # opening from non-local should not return the tree
 
758
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
 
759
            self.get_readonly_url('topdir'))
 
760
        self.assertEqual(None, tree)
 
761
        # without a tree:
 
762
        self.make_branch('topdir/foo')
 
763
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
 
764
        self.assertIs(tree, None)
 
765
        self.assertEqual(os.path.realpath('topdir/foo'),
 
766
                         self.local_branch_path(branch))
 
767
 
 
768
    def test_open_from_transport(self):
 
769
        # transport pointing at bzrdir should give a bzrdir with root transport
 
770
        # set to the given transport
 
771
        control = bzrdir.BzrDir.create(self.get_url())
 
772
        t = self.get_transport()
 
773
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(t)
 
774
        self.assertEqual(t.base, opened_bzrdir.root_transport.base)
 
775
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
776
 
 
777
    def test_open_from_transport_no_bzrdir(self):
 
778
        t = self.get_transport()
 
779
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
 
780
 
 
781
    def test_open_from_transport_bzrdir_in_parent(self):
 
782
        control = bzrdir.BzrDir.create(self.get_url())
 
783
        t = self.get_transport()
 
784
        t.mkdir('subdir')
 
785
        t = t.clone('subdir')
 
786
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport, t)
 
787
 
 
788
    def test_sprout_recursive(self):
 
789
        tree = self.make_branch_and_tree('tree1',
 
790
                                         format='dirstate-with-subtree')
 
791
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
792
            format='dirstate-with-subtree')
 
793
        sub_tree.set_root_id('subtree-root')
 
794
        tree.add_reference(sub_tree)
 
795
        self.build_tree(['tree1/subtree/file'])
 
796
        sub_tree.add('file')
 
797
        tree.commit('Initial commit')
 
798
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
 
799
        tree2.lock_read()
 
800
        self.addCleanup(tree2.unlock)
 
801
        self.assertPathExists('tree2/subtree/file')
 
802
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
 
803
 
 
804
    def test_cloning_metadir(self):
 
805
        """Ensure that cloning metadir is suitable"""
 
806
        bzrdir = self.make_bzrdir('bzrdir')
 
807
        bzrdir.cloning_metadir()
 
808
        branch = self.make_branch('branch', format='knit')
 
809
        format = branch.bzrdir.cloning_metadir()
 
810
        self.assertIsInstance(format.workingtree_format,
 
811
            workingtree_4.WorkingTreeFormat6)
 
812
 
 
813
    def test_sprout_recursive_treeless(self):
 
814
        tree = self.make_branch_and_tree('tree1',
 
815
            format='dirstate-with-subtree')
 
816
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
817
            format='dirstate-with-subtree')
 
818
        tree.add_reference(sub_tree)
 
819
        self.build_tree(['tree1/subtree/file'])
 
820
        sub_tree.add('file')
 
821
        tree.commit('Initial commit')
 
822
        # The following line force the orhaning to reveal bug #634470
 
823
        tree.branch.get_config().set_user_option(
 
824
            'bzr.transform.orphan_policy', 'move')
 
825
        tree.bzrdir.destroy_workingtree()
 
826
        # FIXME: subtree/.bzr is left here which allows the test to pass (or
 
827
        # fail :-( ) -- vila 20100909
 
828
        repo = self.make_repository('repo', shared=True,
 
829
            format='dirstate-with-subtree')
 
830
        repo.set_make_working_trees(False)
 
831
        # FIXME: we just deleted the workingtree and now we want to use it ????
 
832
        # At a minimum, we should use tree.branch below (but this fails too
 
833
        # currently) or stop calling this test 'treeless'. Specifically, I've
 
834
        # turn the line below into an assertRaises when 'subtree/.bzr' is
 
835
        # orphaned and sprout tries to access the branch there (which is left
 
836
        # by bzrdir.BzrDirMeta1.destroy_workingtree when it ignores the
 
837
        # [DeletingParent('Not deleting', u'subtree', None)] conflict). See bug
 
838
        # #634470.  -- vila 20100909
 
839
        self.assertRaises(errors.NotBranchError,
 
840
                          tree.bzrdir.sprout, 'repo/tree2')
 
841
#        self.assertPathExists('repo/tree2/subtree')
 
842
#        self.assertPathDoesNotExist('repo/tree2/subtree/file')
 
843
 
 
844
    def make_foo_bar_baz(self):
 
845
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
 
846
        bar = self.make_branch('foo/bar').bzrdir
 
847
        baz = self.make_branch('baz').bzrdir
 
848
        return foo, bar, baz
 
849
 
 
850
    def test_find_bzrdirs(self):
 
851
        foo, bar, baz = self.make_foo_bar_baz()
 
852
        t = self.get_transport()
 
853
        self.assertEqualBzrdirs([baz, foo, bar], bzrdir.BzrDir.find_bzrdirs(t))
 
854
 
 
855
    def make_fake_permission_denied_transport(self, transport, paths):
 
856
        """Create a transport that raises PermissionDenied for some paths."""
 
857
        def filter(path):
 
858
            if path in paths:
 
859
                raise errors.PermissionDenied(path)
 
860
            return path
 
861
        path_filter_server = pathfilter.PathFilteringServer(transport, filter)
 
862
        path_filter_server.start_server()
 
863
        self.addCleanup(path_filter_server.stop_server)
 
864
        path_filter_transport = pathfilter.PathFilteringTransport(
 
865
            path_filter_server, '.')
 
866
        return (path_filter_server, path_filter_transport)
 
867
 
 
868
    def assertBranchUrlsEndWith(self, expect_url_suffix, actual_bzrdirs):
 
869
        """Check that each branch url ends with the given suffix."""
 
870
        for actual_bzrdir in actual_bzrdirs:
 
871
            self.assertEndsWith(actual_bzrdir.user_url, expect_url_suffix)
 
872
 
 
873
    def test_find_bzrdirs_permission_denied(self):
 
874
        foo, bar, baz = self.make_foo_bar_baz()
 
875
        t = self.get_transport()
 
876
        path_filter_server, path_filter_transport = \
 
877
            self.make_fake_permission_denied_transport(t, ['foo'])
 
878
        # local transport
 
879
        self.assertBranchUrlsEndWith('/baz/',
 
880
            bzrdir.BzrDir.find_bzrdirs(path_filter_transport))
 
881
        # smart server
 
882
        smart_transport = self.make_smart_server('.',
 
883
            backing_server=path_filter_server)
 
884
        self.assertBranchUrlsEndWith('/baz/',
 
885
            bzrdir.BzrDir.find_bzrdirs(smart_transport))
 
886
 
 
887
    def test_find_bzrdirs_list_current(self):
 
888
        def list_current(transport):
 
889
            return [s for s in transport.list_dir('') if s != 'baz']
 
890
 
 
891
        foo, bar, baz = self.make_foo_bar_baz()
 
892
        t = self.get_transport()
 
893
        self.assertEqualBzrdirs(
 
894
            [foo, bar],
 
895
            bzrdir.BzrDir.find_bzrdirs(t, list_current=list_current))
 
896
 
 
897
    def test_find_bzrdirs_evaluate(self):
 
898
        def evaluate(bzrdir):
 
899
            try:
 
900
                repo = bzrdir.open_repository()
 
901
            except NoRepositoryPresent:
 
902
                return True, bzrdir.root_transport.base
 
903
            else:
 
904
                return False, bzrdir.root_transport.base
 
905
 
 
906
        foo, bar, baz = self.make_foo_bar_baz()
 
907
        t = self.get_transport()
 
908
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
 
909
                         list(bzrdir.BzrDir.find_bzrdirs(t, evaluate=evaluate)))
 
910
 
 
911
    def assertEqualBzrdirs(self, first, second):
 
912
        first = list(first)
 
913
        second = list(second)
 
914
        self.assertEqual(len(first), len(second))
 
915
        for x, y in zip(first, second):
 
916
            self.assertEqual(x.root_transport.base, y.root_transport.base)
 
917
 
 
918
    def test_find_branches(self):
 
919
        root = self.make_repository('', shared=True)
 
920
        foo, bar, baz = self.make_foo_bar_baz()
 
921
        qux = self.make_bzrdir('foo/qux')
 
922
        t = self.get_transport()
 
923
        branches = bzrdir.BzrDir.find_branches(t)
 
924
        self.assertEqual(baz.root_transport.base, branches[0].base)
 
925
        self.assertEqual(foo.root_transport.base, branches[1].base)
 
926
        self.assertEqual(bar.root_transport.base, branches[2].base)
 
927
 
 
928
        # ensure this works without a top-level repo
 
929
        branches = bzrdir.BzrDir.find_branches(t.clone('foo'))
 
930
        self.assertEqual(foo.root_transport.base, branches[0].base)
 
931
        self.assertEqual(bar.root_transport.base, branches[1].base)
 
932
 
 
933
 
 
934
class TestMissingRepoBranchesSkipped(TestCaseWithMemoryTransport):
 
935
 
 
936
    def test_find_bzrdirs_missing_repo(self):
 
937
        t = self.get_transport()
 
938
        arepo = self.make_repository('arepo', shared=True)
 
939
        abranch_url = arepo.user_url + '/abranch'
 
940
        abranch = bzrdir.BzrDir.create(abranch_url).create_branch()
 
941
        t.delete_tree('arepo/.bzr')
 
942
        self.assertRaises(errors.NoRepositoryPresent,
 
943
            branch.Branch.open, abranch_url)
 
944
        self.make_branch('baz')
 
945
        for actual_bzrdir in bzrdir.BzrDir.find_branches(t):
 
946
            self.assertEndsWith(actual_bzrdir.user_url, '/baz/')
 
947
 
 
948
 
 
949
class TestMeta1DirFormat(TestCaseWithTransport):
 
950
    """Tests specific to the meta1 dir format."""
 
951
 
 
952
    def test_right_base_dirs(self):
 
953
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
954
        t = dir.transport
 
955
        branch_base = t.clone('branch').base
 
956
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
957
        self.assertEqual(branch_base,
 
958
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
 
959
        repository_base = t.clone('repository').base
 
960
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
961
        repository_format = repository.format_registry.get_default()
 
962
        self.assertEqual(repository_base,
 
963
                         dir.get_repository_transport(repository_format).base)
 
964
        checkout_base = t.clone('checkout').base
 
965
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
 
966
        self.assertEqual(checkout_base,
 
967
                         dir.get_workingtree_transport(workingtree_3.WorkingTreeFormat3()).base)
 
968
 
 
969
    def test_meta1dir_uses_lockdir(self):
 
970
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
971
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
972
        t = dir.transport
 
973
        self.assertIsDirectory('branch-lock', t)
 
974
 
 
975
    def test_comparison(self):
 
976
        """Equality and inequality behave properly.
 
977
 
 
978
        Metadirs should compare equal iff they have the same repo, branch and
 
979
        tree formats.
 
980
        """
 
981
        mydir = bzrdir.format_registry.make_bzrdir('knit')
 
982
        self.assertEqual(mydir, mydir)
 
983
        self.assertFalse(mydir != mydir)
 
984
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
 
985
        self.assertEqual(otherdir, mydir)
 
986
        self.assertFalse(otherdir != mydir)
 
987
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
988
        self.assertNotEqual(otherdir2, mydir)
 
989
        self.assertFalse(otherdir2 == mydir)
 
990
 
 
991
    def test_needs_conversion_different_working_tree(self):
 
992
        # meta1dirs need an conversion if any element is not the default.
 
993
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
994
        tree = self.make_branch_and_tree('tree', format='knit')
 
995
        self.assertTrue(tree.bzrdir.needs_format_conversion(
 
996
            new_format))
 
997
 
 
998
    def test_initialize_on_format_uses_smart_transport(self):
 
999
        self.setup_smart_server_with_call_log()
 
1000
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
 
1001
        transport = self.get_transport('target')
 
1002
        transport.ensure_base()
 
1003
        self.reset_smart_call_log()
 
1004
        instance = new_format.initialize_on_transport(transport)
 
1005
        self.assertIsInstance(instance, remote.RemoteBzrDir)
 
1006
        rpc_count = len(self.hpss_calls)
 
1007
        # This figure represent the amount of work to perform this use case. It
 
1008
        # is entirely ok to reduce this number if a test fails due to rpc_count
 
1009
        # being too low. If rpc_count increases, more network roundtrips have
 
1010
        # become necessary for this use case. Please do not adjust this number
 
1011
        # upwards without agreement from bzr's network support maintainers.
 
1012
        self.assertEqual(2, rpc_count)
 
1013
 
 
1014
 
 
1015
class NonLocalTests(TestCaseWithTransport):
 
1016
    """Tests for bzrdir static behaviour on non local paths."""
 
1017
 
 
1018
    def setUp(self):
 
1019
        super(NonLocalTests, self).setUp()
 
1020
        self.vfs_transport_factory = memory.MemoryServer
 
1021
 
 
1022
    def test_create_branch_convenience(self):
 
1023
        # outside a repo the default convenience output is a repo+branch_tree
 
1024
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1025
        branch = bzrdir.BzrDir.create_branch_convenience(
 
1026
            self.get_url('foo'), format=format)
 
1027
        self.assertRaises(errors.NoWorkingTree,
 
1028
                          branch.bzrdir.open_workingtree)
 
1029
        branch.bzrdir.open_repository()
 
1030
 
 
1031
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
1032
        # outside a repo the default convenience output is a repo+branch_tree
 
1033
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1034
        self.assertRaises(errors.NotLocalUrl,
 
1035
            bzrdir.BzrDir.create_branch_convenience,
 
1036
            self.get_url('foo'),
 
1037
            force_new_tree=True,
 
1038
            format=format)
 
1039
        t = self.get_transport()
 
1040
        self.assertFalse(t.has('foo'))
 
1041
 
 
1042
    def test_clone(self):
 
1043
        # clone into a nonlocal path works
 
1044
        format = bzrdir.format_registry.make_bzrdir('knit')
 
1045
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
1046
                                                         format=format)
 
1047
        branch.bzrdir.open_workingtree()
 
1048
        result = branch.bzrdir.clone(self.get_url('remote'))
 
1049
        self.assertRaises(errors.NoWorkingTree,
 
1050
                          result.open_workingtree)
 
1051
        result.open_branch()
 
1052
        result.open_repository()
 
1053
 
 
1054
    def test_checkout_metadir(self):
 
1055
        # checkout_metadir has reasonable working tree format even when no
 
1056
        # working tree is present
 
1057
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
1058
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
1059
        checkout_format = my_bzrdir.checkout_metadir()
 
1060
        self.assertIsInstance(checkout_format.workingtree_format,
 
1061
                              workingtree_4.WorkingTreeFormat4)
 
1062
 
 
1063
 
 
1064
class TestHTTPRedirections(object):
 
1065
    """Test redirection between two http servers.
 
1066
 
 
1067
    This MUST be used by daughter classes that also inherit from
 
1068
    TestCaseWithTwoWebservers.
 
1069
 
 
1070
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
1071
    test framework will try to create an instance which cannot
 
1072
    run, its implementation being incomplete.
 
1073
    """
 
1074
 
 
1075
    def create_transport_readonly_server(self):
 
1076
        # We don't set the http protocol version, relying on the default
 
1077
        return http_utils.HTTPServerRedirecting()
 
1078
 
 
1079
    def create_transport_secondary_server(self):
 
1080
        # We don't set the http protocol version, relying on the default
 
1081
        return http_utils.HTTPServerRedirecting()
 
1082
 
 
1083
    def setUp(self):
 
1084
        super(TestHTTPRedirections, self).setUp()
 
1085
        # The redirections will point to the new server
 
1086
        self.new_server = self.get_readonly_server()
 
1087
        # The requests to the old server will be redirected
 
1088
        self.old_server = self.get_secondary_server()
 
1089
        # Configure the redirections
 
1090
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
1091
 
 
1092
    def test_loop(self):
 
1093
        # Both servers redirect to each other creating a loop
 
1094
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
1095
        # Starting from either server should loop
 
1096
        old_url = self._qualified_url(self.old_server.host,
 
1097
                                      self.old_server.port)
 
1098
        oldt = self._transport(old_url)
 
1099
        self.assertRaises(errors.NotBranchError,
 
1100
                          bzrdir.BzrDir.open_from_transport, oldt)
 
1101
        new_url = self._qualified_url(self.new_server.host,
 
1102
                                      self.new_server.port)
 
1103
        newt = self._transport(new_url)
 
1104
        self.assertRaises(errors.NotBranchError,
 
1105
                          bzrdir.BzrDir.open_from_transport, newt)
 
1106
 
 
1107
    def test_qualifier_preserved(self):
 
1108
        wt = self.make_branch_and_tree('branch')
 
1109
        old_url = self._qualified_url(self.old_server.host,
 
1110
                                      self.old_server.port)
 
1111
        start = self._transport(old_url).clone('branch')
 
1112
        bdir = bzrdir.BzrDir.open_from_transport(start)
 
1113
        # Redirection should preserve the qualifier, hence the transport class
 
1114
        # itself.
 
1115
        self.assertIsInstance(bdir.root_transport, type(start))
 
1116
 
 
1117
 
 
1118
class TestHTTPRedirections_urllib(TestHTTPRedirections,
 
1119
                                  http_utils.TestCaseWithTwoWebservers):
 
1120
    """Tests redirections for urllib implementation"""
 
1121
 
 
1122
    _transport = HttpTransport_urllib
 
1123
 
 
1124
    def _qualified_url(self, host, port):
 
1125
        result = 'http+urllib://%s:%s' % (host, port)
 
1126
        self.permit_url(result)
 
1127
        return result
 
1128
 
 
1129
 
 
1130
 
 
1131
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
 
1132
                                  TestHTTPRedirections,
 
1133
                                  http_utils.TestCaseWithTwoWebservers):
 
1134
    """Tests redirections for pycurl implementation"""
 
1135
 
 
1136
    def _qualified_url(self, host, port):
 
1137
        result = 'http+pycurl://%s:%s' % (host, port)
 
1138
        self.permit_url(result)
 
1139
        return result
 
1140
 
 
1141
 
 
1142
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
 
1143
                                  http_utils.TestCaseWithTwoWebservers):
 
1144
    """Tests redirections for the nosmart decorator"""
 
1145
 
 
1146
    _transport = NoSmartTransportDecorator
 
1147
 
 
1148
    def _qualified_url(self, host, port):
 
1149
        result = 'nosmart+http://%s:%s' % (host, port)
 
1150
        self.permit_url(result)
 
1151
        return result
 
1152
 
 
1153
 
 
1154
class TestHTTPRedirections_readonly(TestHTTPRedirections,
 
1155
                                    http_utils.TestCaseWithTwoWebservers):
 
1156
    """Tests redirections for readonly decoratror"""
 
1157
 
 
1158
    _transport = ReadonlyTransportDecorator
 
1159
 
 
1160
    def _qualified_url(self, host, port):
 
1161
        result = 'readonly+http://%s:%s' % (host, port)
 
1162
        self.permit_url(result)
 
1163
        return result
 
1164
 
 
1165
 
 
1166
class TestDotBzrHidden(TestCaseWithTransport):
 
1167
 
 
1168
    ls = ['ls']
 
1169
    if sys.platform == 'win32':
 
1170
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
 
1171
 
 
1172
    def get_ls(self):
 
1173
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
 
1174
            stderr=subprocess.PIPE)
 
1175
        out, err = f.communicate()
 
1176
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
 
1177
                         % (self.ls, err))
 
1178
        return out.splitlines()
 
1179
 
 
1180
    def test_dot_bzr_hidden(self):
 
1181
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1182
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1183
        b = bzrdir.BzrDir.create('.')
 
1184
        self.build_tree(['a'])
 
1185
        self.assertEquals(['a'], self.get_ls())
 
1186
 
 
1187
    def test_dot_bzr_hidden_with_url(self):
 
1188
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
1189
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
1190
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
 
1191
        self.build_tree(['a'])
 
1192
        self.assertEquals(['a'], self.get_ls())
 
1193
 
 
1194
 
 
1195
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
 
1196
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
 
1197
 
 
1198
    def _open(self, transport):
 
1199
        return _TestBzrDir(transport, self)
 
1200
 
 
1201
 
 
1202
class _TestBzrDir(bzrdir.BzrDirMeta1):
 
1203
    """Test BzrDir implementation for TestBzrDirSprout.
 
1204
 
 
1205
    When created a _TestBzrDir already has repository and a branch.  The branch
 
1206
    is a test double as well.
 
1207
    """
 
1208
 
 
1209
    def __init__(self, *args, **kwargs):
 
1210
        super(_TestBzrDir, self).__init__(*args, **kwargs)
 
1211
        self.test_branch = _TestBranch()
 
1212
        self.test_branch.repository = self.create_repository()
 
1213
 
 
1214
    def open_branch(self, unsupported=False):
 
1215
        return self.test_branch
 
1216
 
 
1217
    def cloning_metadir(self, require_stacking=False):
 
1218
        return _TestBzrDirFormat()
 
1219
 
 
1220
 
 
1221
class _TestBranchFormat(bzrlib.branch.BranchFormat):
 
1222
    """Test Branch format for TestBzrDirSprout."""
 
1223
 
 
1224
 
 
1225
class _TestBranch(bzrlib.branch.Branch):
 
1226
    """Test Branch implementation for TestBzrDirSprout."""
 
1227
 
 
1228
    def __init__(self, *args, **kwargs):
 
1229
        self._format = _TestBranchFormat()
 
1230
        super(_TestBranch, self).__init__(*args, **kwargs)
 
1231
        self.calls = []
 
1232
        self._parent = None
 
1233
 
 
1234
    def sprout(self, *args, **kwargs):
 
1235
        self.calls.append('sprout')
 
1236
        return _TestBranch()
 
1237
 
 
1238
    def copy_content_into(self, destination, revision_id=None):
 
1239
        self.calls.append('copy_content_into')
 
1240
 
 
1241
    def last_revision(self):
 
1242
        return _mod_revision.NULL_REVISION
 
1243
 
 
1244
    def get_parent(self):
 
1245
        return self._parent
 
1246
 
 
1247
    def set_parent(self, parent):
 
1248
        self._parent = parent
 
1249
 
 
1250
    def lock_read(self):
 
1251
        return lock.LogicalLockResult(self.unlock)
 
1252
 
 
1253
    def unlock(self):
 
1254
        return
 
1255
 
 
1256
 
 
1257
class TestBzrDirSprout(TestCaseWithMemoryTransport):
 
1258
 
 
1259
    def test_sprout_uses_branch_sprout(self):
 
1260
        """BzrDir.sprout calls Branch.sprout.
 
1261
 
 
1262
        Usually, BzrDir.sprout should delegate to the branch's sprout method
 
1263
        for part of the work.  This allows the source branch to control the
 
1264
        choice of format for the new branch.
 
1265
 
 
1266
        There are exceptions, but this tests avoids them:
 
1267
          - if there's no branch in the source bzrdir,
 
1268
          - or if the stacking has been requested and the format needs to be
 
1269
            overridden to satisfy that.
 
1270
        """
 
1271
        # Make an instrumented bzrdir.
 
1272
        t = self.get_transport('source')
 
1273
        t.ensure_base()
 
1274
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
 
1275
        # The instrumented bzrdir has a test_branch attribute that logs calls
 
1276
        # made to the branch contained in that bzrdir.  Initially the test
 
1277
        # branch exists but no calls have been made to it.
 
1278
        self.assertEqual([], source_bzrdir.test_branch.calls)
 
1279
 
 
1280
        # Sprout the bzrdir
 
1281
        target_url = self.get_url('target')
 
1282
        result = source_bzrdir.sprout(target_url, recurse='no')
 
1283
 
 
1284
        # The bzrdir called the branch's sprout method.
 
1285
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
 
1286
 
 
1287
    def test_sprout_parent(self):
 
1288
        grandparent_tree = self.make_branch('grandparent')
 
1289
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
 
1290
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
 
1291
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
 
1292
 
 
1293
 
 
1294
class TestBzrDirHooks(TestCaseWithMemoryTransport):
 
1295
 
 
1296
    def test_pre_open_called(self):
 
1297
        calls = []
 
1298
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
 
1299
        transport = self.get_transport('foo')
 
1300
        url = transport.base
 
1301
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
 
1302
        self.assertEqual([transport.base], [t.base for t in calls])
 
1303
 
 
1304
    def test_pre_open_actual_exceptions_raised(self):
 
1305
        count = [0]
 
1306
        def fail_once(transport):
 
1307
            count[0] += 1
 
1308
            if count[0] == 1:
 
1309
                raise errors.BzrError("fail")
 
1310
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
 
1311
        transport = self.get_transport('foo')
 
1312
        url = transport.base
 
1313
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
 
1314
        self.assertEqual('fail', err._preformatted_string)
 
1315
 
 
1316
    def test_post_repo_init(self):
 
1317
        from bzrlib.bzrdir import RepoInitHookParams
 
1318
        calls = []
 
1319
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1320
            calls.append, None)
 
1321
        self.make_repository('foo')
 
1322
        self.assertLength(1, calls)
 
1323
        params = calls[0]
 
1324
        self.assertIsInstance(params, RepoInitHookParams)
 
1325
        self.assertTrue(hasattr(params, 'bzrdir'))
 
1326
        self.assertTrue(hasattr(params, 'repository'))
 
1327
 
 
1328
    def test_post_repo_init_hook_repr(self):
 
1329
        param_reprs = []
 
1330
        bzrdir.BzrDir.hooks.install_named_hook('post_repo_init',
 
1331
            lambda params: param_reprs.append(repr(params)), None)
 
1332
        self.make_repository('foo')
 
1333
        self.assertLength(1, param_reprs)
 
1334
        param_repr = param_reprs[0]
 
1335
        self.assertStartsWith(param_repr, '<RepoInitHookParams for ')
 
1336
 
 
1337
 
 
1338
class TestGenerateBackupName(TestCaseWithMemoryTransport):
 
1339
    # FIXME: This may need to be unified with test_osutils.TestBackupNames or
 
1340
    # moved to per_bzrdir or per_transport for better coverage ?
 
1341
    # -- vila 20100909
 
1342
 
 
1343
    def setUp(self):
 
1344
        super(TestGenerateBackupName, self).setUp()
 
1345
        self._transport = self.get_transport()
 
1346
        bzrdir.BzrDir.create(self.get_url(),
 
1347
            possible_transports=[self._transport])
 
1348
        self._bzrdir = bzrdir.BzrDir.open_from_transport(self._transport)
 
1349
 
 
1350
    def test_deprecated_generate_backup_name(self):
 
1351
        res = self.applyDeprecated(
 
1352
                symbol_versioning.deprecated_in((2, 3, 0)),
 
1353
                self._bzrdir.generate_backup_name, 'whatever')
 
1354
 
 
1355
    def test_new(self):
 
1356
        self.assertEqual("a.~1~", self._bzrdir._available_backup_name("a"))
 
1357
 
 
1358
    def test_exiting(self):
 
1359
        self._transport.put_bytes("a.~1~", "some content")
 
1360
        self.assertEqual("a.~2~", self._bzrdir._available_backup_name("a"))
 
1361