~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Aaron Bentley
  • Date: 2007-06-21 23:43:17 UTC
  • mto: (2520.5.2 bzr.mpbundle)
  • mto: This revision was merged to the branch mainline in revision 2631.
  • Revision ID: abentley@panoramicfeedback.com-20070621234317-5w3h8h36oe90sups
Implement new merge directive format

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2010 Canonical Ltd
2
 
#
 
1
    # Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the BzrDir facility and any format specific tests.
18
18
 
19
 
For interface contract tests, see tests/per_bzr_dir.
 
19
For interface contract tests, see tests/bzr_dir_implementations.
20
20
"""
21
21
 
22
 
import os
23
 
import subprocess
24
 
import sys
 
22
import os.path
 
23
from StringIO import StringIO
25
24
 
26
25
from bzrlib import (
27
26
    bzrdir,
28
27
    errors,
29
28
    help_topics,
30
29
    repository,
31
 
    osutils,
32
 
    remote,
 
30
    symbol_versioning,
33
31
    urlutils,
34
 
    win32utils,
35
32
    workingtree,
36
33
    )
37
34
import bzrlib.branch
41
38
                           )
42
39
from bzrlib.tests import (
43
40
    TestCase,
44
 
    TestCaseWithMemoryTransport,
45
41
    TestCaseWithTransport,
46
 
    TestSkipped,
 
42
    test_sftp_transport
47
43
    )
48
 
from bzrlib.tests import(
49
 
    http_server,
50
 
    http_utils,
 
44
from bzrlib.tests.HttpServer import HttpServer
 
45
from bzrlib.tests.HTTPTestUtil import (
 
46
    TestCaseWithTwoWebservers,
 
47
    HTTPServerRedirecting,
51
48
    )
52
49
from bzrlib.tests.test_http import TestWithTransport_pycurl
53
 
from bzrlib.transport import (
54
 
    get_transport,
55
 
    memory,
56
 
    )
 
50
from bzrlib.transport import get_transport
57
51
from bzrlib.transport.http._urllib import HttpTransport_urllib
58
 
from bzrlib.transport.nosmart import NoSmartTransportDecorator
59
 
from bzrlib.transport.readonly import ReadonlyTransportDecorator
60
 
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
 
52
from bzrlib.transport.memory import MemoryServer
 
53
from bzrlib.repofmt import knitrepo, weaverepo
61
54
 
62
55
 
63
56
class TestDefaultFormat(TestCase):
66
59
        old_format = bzrdir.BzrDirFormat.get_default_format()
67
60
        # default is BzrDirFormat6
68
61
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
69
 
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
62
        self.applyDeprecated(symbol_versioning.zero_fourteen, 
 
63
                             bzrdir.BzrDirFormat.set_default_format, 
 
64
                             SampleBzrDirFormat())
70
65
        # creating a bzr dir should now create an instrumented dir.
71
66
        try:
72
67
            result = bzrdir.BzrDir.create('memory:///')
73
68
            self.failUnless(isinstance(result, SampleBzrDir))
74
69
        finally:
75
 
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
70
            self.applyDeprecated(symbol_versioning.zero_fourteen,
 
71
                bzrdir.BzrDirFormat.set_default_format, old_format)
76
72
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
77
73
 
78
74
 
83
79
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
84
80
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
85
81
            ' repositories', deprecated=True)
86
 
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
 
82
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir', 
87
83
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
88
84
        my_format_registry.register_metadir('knit',
89
85
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
94
90
            'branch6',
95
91
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
96
92
            'Experimental successor to knit.  Use at your own risk.',
97
 
            branch_format='bzrlib.branch.BzrBranchFormat6',
98
 
            experimental=True)
 
93
            branch_format='bzrlib.branch.BzrBranchFormat6')
99
94
        my_format_registry.register_metadir(
100
95
            'hidden format',
101
96
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
116
111
        my_bzrdir = my_format_registry.make_bzrdir('weave')
117
112
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
118
113
        my_bzrdir = my_format_registry.make_bzrdir('default')
119
 
        self.assertIsInstance(my_bzrdir.repository_format,
 
114
        self.assertIsInstance(my_bzrdir.repository_format, 
120
115
            knitrepo.RepositoryFormatKnit1)
121
116
        my_bzrdir = my_format_registry.make_bzrdir('knit')
122
 
        self.assertIsInstance(my_bzrdir.repository_format,
 
117
        self.assertIsInstance(my_bzrdir.repository_format, 
123
118
            knitrepo.RepositoryFormatKnit1)
124
119
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
125
120
        self.assertIsInstance(my_bzrdir.get_branch_format(),
129
124
        my_format_registry = self.make_format_registry()
130
125
        self.assertEqual('Format registered lazily',
131
126
                         my_format_registry.get_help('lazy'))
132
 
        self.assertEqual('Format using knits',
 
127
        self.assertEqual('Format using knits', 
133
128
                         my_format_registry.get_help('knit'))
134
 
        self.assertEqual('Format using knits',
 
129
        self.assertEqual('Format using knits', 
135
130
                         my_format_registry.get_help('default'))
136
131
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
137
 
                         ' checkouts or shared repositories',
 
132
                         ' checkouts or shared repositories', 
138
133
                         my_format_registry.get_help('weave'))
139
 
 
 
134
        
140
135
    def test_help_topic(self):
141
136
        topics = help_topics.HelpTopicRegistry()
142
 
        registry = self.make_format_registry()
143
 
        topics.register('current-formats', registry.help_topic,
144
 
                        'Current formats')
145
 
        topics.register('other-formats', registry.help_topic,
146
 
                        'Other formats')
147
 
        new = topics.get_detail('current-formats')
148
 
        rest = topics.get_detail('other-formats')
149
 
        experimental, deprecated = rest.split('Deprecated formats')
150
 
        self.assertContainsRe(new, 'formats-help')
151
 
        self.assertContainsRe(new,
152
 
                ':knit:\n    \(native\) \(default\) Format using knits\n')
153
 
        self.assertContainsRe(experimental,
154
 
                ':branch6:\n    \(native\) Experimental successor to knit')
155
 
        self.assertContainsRe(deprecated,
156
 
                ':lazy:\n    \(native\) Format registered lazily\n')
 
137
        topics.register('formats', self.make_format_registry().help_topic, 
 
138
                        'Directory formats')
 
139
        topic = topics.get_detail('formats')
 
140
        new, deprecated = topic.split('Deprecated formats')
 
141
        self.assertContainsRe(new, 'Bazaar directory formats')
 
142
        self.assertContainsRe(new, 
 
143
            '  knit/default:\n    \(native\) Format using knits\n')
 
144
        self.assertContainsRe(deprecated, 
 
145
            '  lazy:\n    \(native\) Format registered lazily\n')
157
146
        self.assertNotContainsRe(new, 'hidden')
158
147
 
159
148
    def test_set_default_repository(self):
170
159
        finally:
171
160
            bzrdir.format_registry.set_default_repository(old_default)
172
161
 
173
 
    def test_aliases(self):
174
 
        a_registry = bzrdir.BzrDirFormatRegistry()
175
 
        a_registry.register('weave', bzrdir.BzrDirFormat6,
176
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
177
 
            ' repositories', deprecated=True)
178
 
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
179
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
180
 
            ' repositories', deprecated=True, alias=True)
181
 
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
182
 
 
183
162
 
184
163
class SampleBranch(bzrlib.branch.Branch):
185
164
    """A dummy branch for guess what, dummy use."""
188
167
        self.bzrdir = dir
189
168
 
190
169
 
191
 
class SampleRepository(bzrlib.repository.Repository):
192
 
    """A dummy repo."""
193
 
 
194
 
    def __init__(self, dir):
195
 
        self.bzrdir = dir
196
 
 
197
 
 
198
170
class SampleBzrDir(bzrdir.BzrDir):
199
171
    """A sample BzrDir implementation to allow testing static methods."""
200
172
 
204
176
 
205
177
    def open_repository(self):
206
178
        """See BzrDir.open_repository."""
207
 
        return SampleRepository(self)
 
179
        return "A repository"
208
180
 
209
181
    def create_branch(self):
210
182
        """See BzrDir.create_branch."""
218
190
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
219
191
    """A sample format
220
192
 
221
 
    this format is initializable, unsupported to aid in testing the
 
193
    this format is initializable, unsupported to aid in testing the 
222
194
    open and open_downlevel routines.
223
195
    """
224
196
 
226
198
        """See BzrDirFormat.get_format_string()."""
227
199
        return "Sample .bzr dir format."
228
200
 
229
 
    def initialize_on_transport(self, t):
 
201
    def initialize(self, url):
230
202
        """Create a bzr dir."""
 
203
        t = get_transport(url)
231
204
        t.mkdir('.bzr')
232
205
        t.put_bytes('.bzr/branch-format', self.get_format_string())
233
206
        return SampleBzrDir(t, self)
245
218
    def test_find_format(self):
246
219
        # is the right format object found for a branch?
247
220
        # create a branch with a few known format objects.
248
 
        # this is not quite the same as
 
221
        # this is not quite the same as 
249
222
        t = get_transport(self.get_url())
250
223
        self.build_tree(["foo/", "bar/"], transport=t)
251
224
        def check_format(format, url):
255
228
            self.failUnless(isinstance(found_format, format.__class__))
256
229
        check_format(bzrdir.BzrDirFormat5(), "foo")
257
230
        check_format(bzrdir.BzrDirFormat6(), "bar")
258
 
 
 
231
        
259
232
    def test_find_format_nothing_there(self):
260
233
        self.assertRaises(NotBranchError,
261
234
                          bzrdir.BzrDirFormat.find_format,
288
261
        # now open_downlevel should fail too.
289
262
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
290
263
 
 
264
    def test_create_repository(self):
 
265
        format = SampleBzrDirFormat()
 
266
        repo = bzrdir.BzrDir.create_repository(self.get_url(), format=format)
 
267
        self.assertEqual('A repository', repo)
 
268
 
 
269
    def test_create_repository_shared(self):
 
270
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
271
        repo = bzrdir.BzrDir.create_repository('.', shared=True)
 
272
        self.assertTrue(repo.is_shared())
 
273
 
 
274
    def test_create_repository_nonshared(self):
 
275
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
276
        repo = bzrdir.BzrDir.create_repository('.')
 
277
        self.assertFalse(repo.is_shared())
 
278
 
 
279
    def test_create_repository_under_shared(self):
 
280
        # an explicit create_repository always does so.
 
281
        # we trust the format is right from the 'create_repository test'
 
282
        format = bzrdir.format_registry.make_bzrdir('knit')
 
283
        self.make_repository('.', shared=True, format=format)
 
284
        repo = bzrdir.BzrDir.create_repository(self.get_url('child'),
 
285
                                               format=format)
 
286
        self.assertTrue(isinstance(repo, repository.Repository))
 
287
        self.assertTrue(repo.bzrdir.root_transport.base.endswith('child/'))
 
288
 
291
289
    def test_create_branch_and_repo_uses_default(self):
292
290
        format = SampleBzrDirFormat()
293
 
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
291
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(), 
294
292
                                                      format=format)
295
293
        self.assertTrue(isinstance(branch, SampleBranch))
296
294
 
305
303
                          branch.bzrdir.open_repository)
306
304
 
307
305
    def test_create_branch_and_repo_under_shared_force_new(self):
308
 
        # creating a branch and repo in a shared repo can be forced to
 
306
        # creating a branch and repo in a shared repo can be forced to 
309
307
        # make a new repo
310
308
        format = bzrdir.format_registry.make_bzrdir('knit')
311
309
        self.make_repository('.', shared=True, format=format)
316
314
 
317
315
    def test_create_standalone_working_tree(self):
318
316
        format = SampleBzrDirFormat()
319
 
        # note this is deliberately readonly, as this failure should
 
317
        # note this is deliberately readonly, as this failure should 
320
318
        # occur before any writes.
321
319
        self.assertRaises(errors.NotLocalUrl,
322
320
                          bzrdir.BzrDir.create_standalone_workingtree,
323
321
                          self.get_readonly_url(), format=format)
324
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
 
322
        tree = bzrdir.BzrDir.create_standalone_workingtree('.', 
325
323
                                                           format=format)
326
324
        self.assertEqual('A tree', tree)
327
325
 
329
327
        # create standalone working tree always makes a repo.
330
328
        format = bzrdir.format_registry.make_bzrdir('knit')
331
329
        self.make_repository('.', shared=True, format=format)
332
 
        # note this is deliberately readonly, as this failure should
 
330
        # note this is deliberately readonly, as this failure should 
333
331
        # occur before any writes.
334
332
        self.assertRaises(errors.NotLocalUrl,
335
333
                          bzrdir.BzrDir.create_standalone_workingtree,
336
334
                          self.get_readonly_url('child'), format=format)
337
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
 
335
        tree = bzrdir.BzrDir.create_standalone_workingtree('child', 
338
336
            format=format)
339
337
        tree.bzrdir.open_repository()
340
338
 
345
343
        branch.bzrdir.open_workingtree()
346
344
        branch.bzrdir.open_repository()
347
345
 
348
 
    def test_create_branch_convenience_possible_transports(self):
349
 
        """Check that the optional 'possible_transports' is recognized"""
350
 
        format = bzrdir.format_registry.make_bzrdir('knit')
351
 
        t = self.get_transport()
352
 
        branch = bzrdir.BzrDir.create_branch_convenience(
353
 
            '.', format=format, possible_transports=[t])
354
 
        branch.bzrdir.open_workingtree()
355
 
        branch.bzrdir.open_repository()
356
 
 
357
346
    def test_create_branch_convenience_root(self):
358
347
        """Creating a branch at the root of a fs should work."""
359
 
        self.vfs_transport_factory = memory.MemoryServer
 
348
        self.vfs_transport_factory = MemoryServer
360
349
        # outside a repo the default convenience output is a repo+branch_tree
361
350
        format = bzrdir.format_registry.make_bzrdir('knit')
362
 
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
 
351
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(), 
363
352
                                                         format=format)
364
353
        self.assertRaises(errors.NoWorkingTree,
365
354
                          branch.bzrdir.open_workingtree)
375
364
        branch.bzrdir.open_workingtree()
376
365
        self.assertRaises(errors.NoRepositoryPresent,
377
366
                          branch.bzrdir.open_repository)
378
 
 
 
367
            
379
368
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
380
369
        # inside a repo the default convenience output is a branch+ follow the
381
370
        # repo tree policy but we can override that
387
376
                          branch.bzrdir.open_workingtree)
388
377
        self.assertRaises(errors.NoRepositoryPresent,
389
378
                          branch.bzrdir.open_repository)
390
 
 
 
379
            
391
380
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
392
381
        # inside a repo the default convenience output is a branch+ follow the
393
382
        # repo tree policy
394
383
        format = bzrdir.format_registry.make_bzrdir('knit')
395
384
        repo = self.make_repository('.', shared=True, format=format)
396
385
        repo.set_make_working_trees(False)
397
 
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
386
        branch = bzrdir.BzrDir.create_branch_convenience('child', 
398
387
                                                         format=format)
399
388
        self.assertRaises(errors.NoWorkingTree,
400
389
                          branch.bzrdir.open_workingtree)
424
413
        branch.bzrdir.open_workingtree()
425
414
 
426
415
 
427
 
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
428
 
 
429
 
    def test_acquire_repository_standalone(self):
430
 
        """The default acquisition policy should create a standalone branch."""
431
 
        my_bzrdir = self.make_bzrdir('.')
432
 
        repo_policy = my_bzrdir.determine_repository_policy()
433
 
        repo, is_new = repo_policy.acquire_repository()
434
 
        self.assertEqual(repo.bzrdir.root_transport.base,
435
 
                         my_bzrdir.root_transport.base)
436
 
        self.assertFalse(repo.is_shared())
437
 
 
438
 
    def test_determine_stacking_policy(self):
439
 
        parent_bzrdir = self.make_bzrdir('.')
440
 
        child_bzrdir = self.make_bzrdir('child')
441
 
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
442
 
        repo_policy = child_bzrdir.determine_repository_policy()
443
 
        self.assertEqual('http://example.org', repo_policy._stack_on)
444
 
 
445
 
    def test_determine_stacking_policy_relative(self):
446
 
        parent_bzrdir = self.make_bzrdir('.')
447
 
        child_bzrdir = self.make_bzrdir('child')
448
 
        parent_bzrdir.get_config().set_default_stack_on('child2')
449
 
        repo_policy = child_bzrdir.determine_repository_policy()
450
 
        self.assertEqual('child2', repo_policy._stack_on)
451
 
        self.assertEqual(parent_bzrdir.root_transport.base,
452
 
                         repo_policy._stack_on_pwd)
453
 
 
454
 
    def prepare_default_stacking(self, child_format='1.6'):
455
 
        parent_bzrdir = self.make_bzrdir('.')
456
 
        child_branch = self.make_branch('child', format=child_format)
457
 
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
458
 
        new_child_transport = parent_bzrdir.transport.clone('child2')
459
 
        return child_branch, new_child_transport
460
 
 
461
 
    def test_clone_on_transport_obeys_stacking_policy(self):
462
 
        child_branch, new_child_transport = self.prepare_default_stacking()
463
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
464
 
        self.assertEqual(child_branch.base,
465
 
                         new_child.open_branch().get_stacked_on_url())
466
 
 
467
 
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
468
 
        # Make stackable source branch with an unstackable repo format.
469
 
        source_bzrdir = self.make_bzrdir('source')
470
 
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
471
 
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
472
 
        # Make a directory with a default stacking policy
473
 
        parent_bzrdir = self.make_bzrdir('parent')
474
 
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
475
 
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
476
 
        # Clone source into directory
477
 
        target = source_bzrdir.clone(self.get_url('parent/target'))
478
 
 
479
 
    def test_sprout_obeys_stacking_policy(self):
480
 
        child_branch, new_child_transport = self.prepare_default_stacking()
481
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
482
 
        self.assertEqual(child_branch.base,
483
 
                         new_child.open_branch().get_stacked_on_url())
484
 
 
485
 
    def test_clone_ignores_policy_for_unsupported_formats(self):
486
 
        child_branch, new_child_transport = self.prepare_default_stacking(
487
 
            child_format='pack-0.92')
488
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
489
 
        self.assertRaises(errors.UnstackableBranchFormat,
490
 
                          new_child.open_branch().get_stacked_on_url)
491
 
 
492
 
    def test_sprout_ignores_policy_for_unsupported_formats(self):
493
 
        child_branch, new_child_transport = self.prepare_default_stacking(
494
 
            child_format='pack-0.92')
495
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
496
 
        self.assertRaises(errors.UnstackableBranchFormat,
497
 
                          new_child.open_branch().get_stacked_on_url)
498
 
 
499
 
    def test_sprout_upgrades_format_if_stacked_specified(self):
500
 
        child_branch, new_child_transport = self.prepare_default_stacking(
501
 
            child_format='pack-0.92')
502
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
503
 
                                               stacked=True)
504
 
        self.assertEqual(child_branch.bzrdir.root_transport.base,
505
 
                         new_child.open_branch().get_stacked_on_url())
506
 
        repo = new_child.open_repository()
507
 
        self.assertTrue(repo._format.supports_external_lookups)
508
 
        self.assertFalse(repo.supports_rich_root())
509
 
 
510
 
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
511
 
        child_branch, new_child_transport = self.prepare_default_stacking(
512
 
            child_format='pack-0.92')
513
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
514
 
            stacked_on=child_branch.bzrdir.root_transport.base)
515
 
        self.assertEqual(child_branch.bzrdir.root_transport.base,
516
 
                         new_child.open_branch().get_stacked_on_url())
517
 
        repo = new_child.open_repository()
518
 
        self.assertTrue(repo._format.supports_external_lookups)
519
 
        self.assertFalse(repo.supports_rich_root())
520
 
 
521
 
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
522
 
        child_branch, new_child_transport = self.prepare_default_stacking(
523
 
            child_format='rich-root-pack')
524
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
525
 
                                               stacked=True)
526
 
        repo = new_child.open_repository()
527
 
        self.assertTrue(repo._format.supports_external_lookups)
528
 
        self.assertTrue(repo.supports_rich_root())
529
 
 
530
 
    def test_add_fallback_repo_handles_absolute_urls(self):
531
 
        stack_on = self.make_branch('stack_on', format='1.6')
532
 
        repo = self.make_repository('repo', format='1.6')
533
 
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
534
 
        policy._add_fallback(repo)
535
 
 
536
 
    def test_add_fallback_repo_handles_relative_urls(self):
537
 
        stack_on = self.make_branch('stack_on', format='1.6')
538
 
        repo = self.make_repository('repo', format='1.6')
539
 
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
540
 
        policy._add_fallback(repo)
541
 
 
542
 
    def test_configure_relative_branch_stacking_url(self):
543
 
        stack_on = self.make_branch('stack_on', format='1.6')
544
 
        stacked = self.make_branch('stack_on/stacked', format='1.6')
545
 
        policy = bzrdir.UseExistingRepository(stacked.repository,
546
 
            '.', stack_on.base)
547
 
        policy.configure_branch(stacked)
548
 
        self.assertEqual('..', stacked.get_stacked_on_url())
549
 
 
550
 
    def test_relative_branch_stacking_to_absolute(self):
551
 
        stack_on = self.make_branch('stack_on', format='1.6')
552
 
        stacked = self.make_branch('stack_on/stacked', format='1.6')
553
 
        policy = bzrdir.UseExistingRepository(stacked.repository,
554
 
            '.', self.get_readonly_url('stack_on'))
555
 
        policy.configure_branch(stacked)
556
 
        self.assertEqual(self.get_readonly_url('stack_on'),
557
 
                         stacked.get_stacked_on_url())
558
 
 
559
 
 
560
416
class ChrootedTests(TestCaseWithTransport):
561
417
    """A support class that provides readonly urls outside the local namespace.
562
418
 
567
423
 
568
424
    def setUp(self):
569
425
        super(ChrootedTests, self).setUp()
570
 
        if not self.vfs_transport_factory == memory.MemoryServer:
571
 
            self.transport_readonly_server = http_server.HttpServer
572
 
 
573
 
    def local_branch_path(self, branch):
574
 
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
426
        if not self.vfs_transport_factory == MemoryServer:
 
427
            self.transport_readonly_server = HttpServer
575
428
 
576
429
    def test_open_containing(self):
577
430
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
584
437
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
585
438
        self.assertEqual('g/p/q', relpath)
586
439
 
587
 
    def test_open_containing_tree_branch_or_repository_empty(self):
588
 
        self.assertRaises(errors.NotBranchError,
589
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
590
 
            self.get_readonly_url(''))
591
 
 
592
 
    def test_open_containing_tree_branch_or_repository_all(self):
593
 
        self.make_branch_and_tree('topdir')
594
 
        tree, branch, repo, relpath = \
595
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
596
 
                'topdir/foo')
597
 
        self.assertEqual(os.path.realpath('topdir'),
598
 
                         os.path.realpath(tree.basedir))
599
 
        self.assertEqual(os.path.realpath('topdir'),
600
 
                         self.local_branch_path(branch))
601
 
        self.assertEqual(
602
 
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
603
 
            repo.bzrdir.transport.local_abspath('repository'))
604
 
        self.assertEqual(relpath, 'foo')
605
 
 
606
 
    def test_open_containing_tree_branch_or_repository_no_tree(self):
607
 
        self.make_branch('branch')
608
 
        tree, branch, repo, relpath = \
609
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
610
 
                'branch/foo')
611
 
        self.assertEqual(tree, None)
612
 
        self.assertEqual(os.path.realpath('branch'),
613
 
                         self.local_branch_path(branch))
614
 
        self.assertEqual(
615
 
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
616
 
            repo.bzrdir.transport.local_abspath('repository'))
617
 
        self.assertEqual(relpath, 'foo')
618
 
 
619
 
    def test_open_containing_tree_branch_or_repository_repo(self):
620
 
        self.make_repository('repo')
621
 
        tree, branch, repo, relpath = \
622
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
623
 
                'repo')
624
 
        self.assertEqual(tree, None)
625
 
        self.assertEqual(branch, None)
626
 
        self.assertEqual(
627
 
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
628
 
            repo.bzrdir.transport.local_abspath('repository'))
629
 
        self.assertEqual(relpath, '')
630
 
 
631
 
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
632
 
        self.make_repository('shared', shared=True)
633
 
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
634
 
                                                force_new_tree=False)
635
 
        tree, branch, repo, relpath = \
636
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
637
 
                'shared/branch')
638
 
        self.assertEqual(tree, None)
639
 
        self.assertEqual(os.path.realpath('shared/branch'),
640
 
                         self.local_branch_path(branch))
641
 
        self.assertEqual(
642
 
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
643
 
            repo.bzrdir.transport.local_abspath('repository'))
644
 
        self.assertEqual(relpath, '')
645
 
 
646
 
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
647
 
        self.make_branch_and_tree('foo')
648
 
        self.build_tree(['foo/bar/'])
649
 
        tree, branch, repo, relpath = \
650
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
651
 
                'foo/bar')
652
 
        self.assertEqual(os.path.realpath('foo'),
653
 
                         os.path.realpath(tree.basedir))
654
 
        self.assertEqual(os.path.realpath('foo'),
655
 
                         self.local_branch_path(branch))
656
 
        self.assertEqual(
657
 
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
658
 
            repo.bzrdir.transport.local_abspath('repository'))
659
 
        self.assertEqual(relpath, 'bar')
660
 
 
661
 
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
662
 
        self.make_repository('bar')
663
 
        self.build_tree(['bar/baz/'])
664
 
        tree, branch, repo, relpath = \
665
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
666
 
                'bar/baz')
667
 
        self.assertEqual(tree, None)
668
 
        self.assertEqual(branch, None)
669
 
        self.assertEqual(
670
 
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
671
 
            repo.bzrdir.transport.local_abspath('repository'))
672
 
        self.assertEqual(relpath, 'baz')
673
 
 
674
440
    def test_open_containing_from_transport(self):
675
441
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
676
442
                          get_transport(self.get_readonly_url('')))
685
451
        self.assertEqual('g/p/q', relpath)
686
452
 
687
453
    def test_open_containing_tree_or_branch(self):
 
454
        def local_branch_path(branch):
 
455
             return os.path.realpath(
 
456
                urlutils.local_path_from_url(branch.base))
 
457
 
688
458
        self.make_branch_and_tree('topdir')
689
459
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
690
460
            'topdir/foo')
691
461
        self.assertEqual(os.path.realpath('topdir'),
692
462
                         os.path.realpath(tree.basedir))
693
463
        self.assertEqual(os.path.realpath('topdir'),
694
 
                         self.local_branch_path(branch))
 
464
                         local_branch_path(branch))
695
465
        self.assertIs(tree.bzrdir, branch.bzrdir)
696
466
        self.assertEqual('foo', relpath)
697
467
        # opening from non-local should not return the tree
705
475
            'topdir/foo')
706
476
        self.assertIs(tree, None)
707
477
        self.assertEqual(os.path.realpath('topdir/foo'),
708
 
                         self.local_branch_path(branch))
 
478
                         local_branch_path(branch))
709
479
        self.assertEqual('', relpath)
710
480
 
711
 
    def test_open_tree_or_branch(self):
712
 
        self.make_branch_and_tree('topdir')
713
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
714
 
        self.assertEqual(os.path.realpath('topdir'),
715
 
                         os.path.realpath(tree.basedir))
716
 
        self.assertEqual(os.path.realpath('topdir'),
717
 
                         self.local_branch_path(branch))
718
 
        self.assertIs(tree.bzrdir, branch.bzrdir)
719
 
        # opening from non-local should not return the tree
720
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
721
 
            self.get_readonly_url('topdir'))
722
 
        self.assertEqual(None, tree)
723
 
        # without a tree:
724
 
        self.make_branch('topdir/foo')
725
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
726
 
        self.assertIs(tree, None)
727
 
        self.assertEqual(os.path.realpath('topdir/foo'),
728
 
                         self.local_branch_path(branch))
729
 
 
730
481
    def test_open_from_transport(self):
731
482
        # transport pointing at bzrdir should give a bzrdir with root transport
732
483
        # set to the given transport
735
486
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
736
487
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
737
488
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
738
 
 
 
489
        
739
490
    def test_open_from_transport_no_bzrdir(self):
740
491
        transport = get_transport(self.get_url())
741
492
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
750
501
                          transport)
751
502
 
752
503
    def test_sprout_recursive(self):
753
 
        tree = self.make_branch_and_tree('tree1',
754
 
                                         format='dirstate-with-subtree')
 
504
        tree = self.make_branch_and_tree('tree1', format='dirstate-with-subtree')
755
505
        sub_tree = self.make_branch_and_tree('tree1/subtree',
756
506
            format='dirstate-with-subtree')
757
 
        sub_tree.set_root_id('subtree-root')
758
507
        tree.add_reference(sub_tree)
759
508
        self.build_tree(['tree1/subtree/file'])
760
509
        sub_tree.add('file')
761
510
        tree.commit('Initial commit')
762
 
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
763
 
        tree2.lock_read()
764
 
        self.addCleanup(tree2.unlock)
 
511
        tree.bzrdir.sprout('tree2')
765
512
        self.failUnlessExists('tree2/subtree/file')
766
 
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
767
513
 
768
514
    def test_cloning_metadir(self):
769
515
        """Ensure that cloning metadir is suitable"""
791
537
        self.failUnlessExists('repo/tree2/subtree')
792
538
        self.failIfExists('repo/tree2/subtree/file')
793
539
 
794
 
    def make_foo_bar_baz(self):
795
 
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
796
 
        bar = self.make_branch('foo/bar').bzrdir
797
 
        baz = self.make_branch('baz').bzrdir
798
 
        return foo, bar, baz
799
 
 
800
 
    def test_find_bzrdirs(self):
801
 
        foo, bar, baz = self.make_foo_bar_baz()
802
 
        transport = get_transport(self.get_url())
803
 
        self.assertEqualBzrdirs([baz, foo, bar],
804
 
                                bzrdir.BzrDir.find_bzrdirs(transport))
805
 
 
806
 
    def test_find_bzrdirs_list_current(self):
807
 
        def list_current(transport):
808
 
            return [s for s in transport.list_dir('') if s != 'baz']
809
 
 
810
 
        foo, bar, baz = self.make_foo_bar_baz()
811
 
        transport = get_transport(self.get_url())
812
 
        self.assertEqualBzrdirs([foo, bar],
813
 
                                bzrdir.BzrDir.find_bzrdirs(transport,
814
 
                                    list_current=list_current))
815
 
 
816
 
 
817
 
    def test_find_bzrdirs_evaluate(self):
818
 
        def evaluate(bzrdir):
819
 
            try:
820
 
                repo = bzrdir.open_repository()
821
 
            except NoRepositoryPresent:
822
 
                return True, bzrdir.root_transport.base
823
 
            else:
824
 
                return False, bzrdir.root_transport.base
825
 
 
826
 
        foo, bar, baz = self.make_foo_bar_baz()
827
 
        transport = get_transport(self.get_url())
828
 
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
829
 
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
830
 
                                                         evaluate=evaluate)))
831
 
 
832
 
    def assertEqualBzrdirs(self, first, second):
833
 
        first = list(first)
834
 
        second = list(second)
835
 
        self.assertEqual(len(first), len(second))
836
 
        for x, y in zip(first, second):
837
 
            self.assertEqual(x.root_transport.base, y.root_transport.base)
838
 
 
839
 
    def test_find_branches(self):
840
 
        root = self.make_repository('', shared=True)
841
 
        foo, bar, baz = self.make_foo_bar_baz()
842
 
        qux = self.make_bzrdir('foo/qux')
843
 
        transport = get_transport(self.get_url())
844
 
        branches = bzrdir.BzrDir.find_branches(transport)
845
 
        self.assertEqual(baz.root_transport.base, branches[0].base)
846
 
        self.assertEqual(foo.root_transport.base, branches[1].base)
847
 
        self.assertEqual(bar.root_transport.base, branches[2].base)
848
 
 
849
 
        # ensure this works without a top-level repo
850
 
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
851
 
        self.assertEqual(foo.root_transport.base, branches[0].base)
852
 
        self.assertEqual(bar.root_transport.base, branches[1].base)
853
 
 
854
540
 
855
541
class TestMeta1DirFormat(TestCaseWithTransport):
856
542
    """Tests specific to the meta1 dir format."""
895
581
 
896
582
    def test_needs_conversion_different_working_tree(self):
897
583
        # meta1dirs need an conversion if any element is not the default.
898
 
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
899
 
        tree = self.make_branch_and_tree('tree', format='knit')
900
 
        self.assertTrue(tree.bzrdir.needs_format_conversion(
901
 
            new_format))
902
 
 
903
 
    def test_initialize_on_format_uses_smart_transport(self):
904
 
        self.setup_smart_server_with_call_log()
905
 
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
906
 
        transport = self.get_transport('target')
907
 
        transport.ensure_base()
908
 
        self.reset_smart_call_log()
909
 
        instance = new_format.initialize_on_transport(transport)
910
 
        self.assertIsInstance(instance, remote.RemoteBzrDir)
911
 
        rpc_count = len(self.hpss_calls)
912
 
        # This figure represent the amount of work to perform this use case. It
913
 
        # is entirely ok to reduce this number if a test fails due to rpc_count
914
 
        # being too low. If rpc_count increases, more network roundtrips have
915
 
        # become necessary for this use case. Please do not adjust this number
916
 
        # upwards without agreement from bzr's network support maintainers.
917
 
        self.assertEqual(2, rpc_count)
 
584
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
585
        # test with 
 
586
        new_default = bzrdir.format_registry.make_bzrdir('dirstate')
 
587
        bzrdir.BzrDirFormat._set_default_format(new_default)
 
588
        try:
 
589
            tree = self.make_branch_and_tree('tree', format='knit')
 
590
            self.assertTrue(tree.bzrdir.needs_format_conversion())
 
591
        finally:
 
592
            bzrdir.BzrDirFormat._set_default_format(old_format)
918
593
 
919
594
 
920
595
class TestFormat5(TestCaseWithTransport):
921
596
    """Tests specific to the version 5 bzrdir format."""
922
597
 
923
598
    def test_same_lockfiles_between_tree_repo_branch(self):
924
 
        # this checks that only a single lockfiles instance is created
 
599
        # this checks that only a single lockfiles instance is created 
925
600
        # for format 5 objects
926
601
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
927
602
        def check_dir_components_use_same_lock(dir):
934
609
        # and if we open it normally.
935
610
        dir = bzrdir.BzrDir.open(self.get_url())
936
611
        check_dir_components_use_same_lock(dir)
937
 
 
 
612
    
938
613
    def test_can_convert(self):
939
614
        # format 5 dirs are convertable
940
615
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
941
616
        self.assertTrue(dir.can_convert_format())
942
 
 
 
617
    
943
618
    def test_needs_conversion(self):
944
 
        # format 5 dirs need a conversion if they are not the default,
945
 
        # and they aren't
946
 
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
947
 
        # don't need to convert it to itself
948
 
        self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
949
 
        # do need to convert it to the current default
950
 
        self.assertTrue(dir.needs_format_conversion(
951
 
            bzrdir.BzrDirFormat.get_default_format()))
 
619
        # format 5 dirs need a conversion if they are not the default.
 
620
        # and they start of not the default.
 
621
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
622
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
 
623
        try:
 
624
            dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
625
            self.assertFalse(dir.needs_format_conversion())
 
626
        finally:
 
627
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
628
        self.assertTrue(dir.needs_format_conversion())
952
629
 
953
630
 
954
631
class TestFormat6(TestCaseWithTransport):
955
632
    """Tests specific to the version 6 bzrdir format."""
956
633
 
957
634
    def test_same_lockfiles_between_tree_repo_branch(self):
958
 
        # this checks that only a single lockfiles instance is created
 
635
        # this checks that only a single lockfiles instance is created 
959
636
        # for format 6 objects
960
637
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
961
638
        def check_dir_components_use_same_lock(dir):
968
645
        # and if we open it normally.
969
646
        dir = bzrdir.BzrDir.open(self.get_url())
970
647
        check_dir_components_use_same_lock(dir)
971
 
 
 
648
    
972
649
    def test_can_convert(self):
973
650
        # format 6 dirs are convertable
974
651
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
975
652
        self.assertTrue(dir.can_convert_format())
976
 
 
 
653
    
977
654
    def test_needs_conversion(self):
978
655
        # format 6 dirs need an conversion if they are not the default.
979
 
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
980
 
        self.assertTrue(dir.needs_format_conversion(
981
 
            bzrdir.BzrDirFormat.get_default_format()))
 
656
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
657
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirMetaFormat1())
 
658
        try:
 
659
            dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
660
            self.assertTrue(dir.needs_format_conversion())
 
661
        finally:
 
662
            bzrdir.BzrDirFormat._set_default_format(old_format)
982
663
 
983
664
 
984
665
class NotBzrDir(bzrlib.bzrdir.BzrDir):
1015
696
 
1016
697
class TestNotBzrDir(TestCaseWithTransport):
1017
698
    """Tests for using the bzrdir api with a non .bzr based disk format.
1018
 
 
 
699
    
1019
700
    If/when one of these is in the core, we can let the implementation tests
1020
701
    verify this works.
1021
702
    """
1022
703
 
1023
704
    def test_create_and_find_format(self):
1024
 
        # create a .notbzr dir
 
705
        # create a .notbzr dir 
1025
706
        format = NotBzrDirFormat()
1026
707
        dir = format.initialize(self.get_url())
1027
708
        self.assertIsInstance(dir, NotBzrDir)
1051
732
 
1052
733
    def setUp(self):
1053
734
        super(NonLocalTests, self).setUp()
1054
 
        self.vfs_transport_factory = memory.MemoryServer
1055
 
 
 
735
        self.vfs_transport_factory = MemoryServer
 
736
    
1056
737
    def test_create_branch_convenience(self):
1057
738
        # outside a repo the default convenience output is a repo+branch_tree
1058
739
        format = bzrdir.format_registry.make_bzrdir('knit')
1095
776
                              workingtree.WorkingTreeFormat3)
1096
777
 
1097
778
 
1098
 
class TestHTTPRedirections(object):
1099
 
    """Test redirection between two http servers.
 
779
class TestHTTPRedirectionLoop(object):
 
780
    """Test redirection loop between two http servers.
1100
781
 
1101
782
    This MUST be used by daughter classes that also inherit from
1102
783
    TestCaseWithTwoWebservers.
1103
784
 
1104
785
    We can't inherit directly from TestCaseWithTwoWebservers or the
1105
786
    test framework will try to create an instance which cannot
1106
 
    run, its implementation being incomplete.
 
787
    run, its implementation being incomplete. 
1107
788
    """
1108
789
 
 
790
    # Should be defined by daughter classes to ensure redirection
 
791
    # still use the same transport implementation (not currently
 
792
    # enforced as it's a bit tricky to get right (see the FIXME
 
793
    # in BzrDir.open_from_transport for the unique use case so
 
794
    # far)
 
795
    _qualifier = None
 
796
 
1109
797
    def create_transport_readonly_server(self):
1110
 
        return http_utils.HTTPServerRedirecting()
 
798
        return HTTPServerRedirecting()
1111
799
 
1112
800
    def create_transport_secondary_server(self):
1113
 
        return http_utils.HTTPServerRedirecting()
 
801
        return HTTPServerRedirecting()
1114
802
 
1115
803
    def setUp(self):
1116
 
        super(TestHTTPRedirections, self).setUp()
 
804
        # Both servers redirect to each server creating a loop
 
805
        super(TestHTTPRedirectionLoop, self).setUp()
1117
806
        # The redirections will point to the new server
1118
807
        self.new_server = self.get_readonly_server()
1119
808
        # The requests to the old server will be redirected
1120
809
        self.old_server = self.get_secondary_server()
1121
810
        # Configure the redirections
1122
811
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
812
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
813
 
 
814
    def _qualified_url(self, host, port):
 
815
        return 'http+%s://%s:%s' % (self._qualifier, host, port)
1123
816
 
1124
817
    def test_loop(self):
1125
 
        # Both servers redirect to each other creating a loop
1126
 
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1127
818
        # Starting from either server should loop
1128
 
        old_url = self._qualified_url(self.old_server.host,
 
819
        old_url = self._qualified_url(self.old_server.host, 
1129
820
                                      self.old_server.port)
1130
821
        oldt = self._transport(old_url)
1131
822
        self.assertRaises(errors.NotBranchError,
1132
823
                          bzrdir.BzrDir.open_from_transport, oldt)
1133
 
        new_url = self._qualified_url(self.new_server.host,
 
824
        new_url = self._qualified_url(self.new_server.host, 
1134
825
                                      self.new_server.port)
1135
826
        newt = self._transport(new_url)
1136
827
        self.assertRaises(errors.NotBranchError,
1137
828
                          bzrdir.BzrDir.open_from_transport, newt)
1138
829
 
1139
 
    def test_qualifier_preserved(self):
1140
 
        wt = self.make_branch_and_tree('branch')
1141
 
        old_url = self._qualified_url(self.old_server.host,
1142
 
                                      self.old_server.port)
1143
 
        start = self._transport(old_url).clone('branch')
1144
 
        bdir = bzrdir.BzrDir.open_from_transport(start)
1145
 
        # Redirection should preserve the qualifier, hence the transport class
1146
 
        # itself.
1147
 
        self.assertIsInstance(bdir.root_transport, type(start))
1148
 
 
1149
 
 
1150
 
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1151
 
                                  http_utils.TestCaseWithTwoWebservers):
 
830
 
 
831
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
 
832
                                  TestCaseWithTwoWebservers):
1152
833
    """Tests redirections for urllib implementation"""
1153
834
 
 
835
    _qualifier = 'urllib'
1154
836
    _transport = HttpTransport_urllib
1155
837
 
1156
 
    def _qualified_url(self, host, port):
1157
 
        result = 'http+urllib://%s:%s' % (host, port)
1158
 
        self.permit_url(result)
1159
 
        return result
1160
 
 
1161
838
 
1162
839
 
1163
840
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1164
 
                                  TestHTTPRedirections,
1165
 
                                  http_utils.TestCaseWithTwoWebservers):
 
841
                                  TestHTTPRedirectionLoop,
 
842
                                  TestCaseWithTwoWebservers):
1166
843
    """Tests redirections for pycurl implementation"""
1167
844
 
1168
 
    def _qualified_url(self, host, port):
1169
 
        result = 'http+pycurl://%s:%s' % (host, port)
1170
 
        self.permit_url(result)
1171
 
        return result
1172
 
 
1173
 
 
1174
 
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1175
 
                                  http_utils.TestCaseWithTwoWebservers):
1176
 
    """Tests redirections for the nosmart decorator"""
1177
 
 
1178
 
    _transport = NoSmartTransportDecorator
1179
 
 
1180
 
    def _qualified_url(self, host, port):
1181
 
        result = 'nosmart+http://%s:%s' % (host, port)
1182
 
        self.permit_url(result)
1183
 
        return result
1184
 
 
1185
 
 
1186
 
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1187
 
                                    http_utils.TestCaseWithTwoWebservers):
1188
 
    """Tests redirections for readonly decoratror"""
1189
 
 
1190
 
    _transport = ReadonlyTransportDecorator
1191
 
 
1192
 
    def _qualified_url(self, host, port):
1193
 
        result = 'readonly+http://%s:%s' % (host, port)
1194
 
        self.permit_url(result)
1195
 
        return result
1196
 
 
1197
 
 
1198
 
class TestDotBzrHidden(TestCaseWithTransport):
1199
 
 
1200
 
    ls = ['ls']
1201
 
    if sys.platform == 'win32':
1202
 
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1203
 
 
1204
 
    def get_ls(self):
1205
 
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1206
 
            stderr=subprocess.PIPE)
1207
 
        out, err = f.communicate()
1208
 
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1209
 
                         % (self.ls, err))
1210
 
        return out.splitlines()
1211
 
 
1212
 
    def test_dot_bzr_hidden(self):
1213
 
        if sys.platform == 'win32' and not win32utils.has_win32file:
1214
 
            raise TestSkipped('unable to make file hidden without pywin32 library')
1215
 
        b = bzrdir.BzrDir.create('.')
1216
 
        self.build_tree(['a'])
1217
 
        self.assertEquals(['a'], self.get_ls())
1218
 
 
1219
 
    def test_dot_bzr_hidden_with_url(self):
1220
 
        if sys.platform == 'win32' and not win32utils.has_win32file:
1221
 
            raise TestSkipped('unable to make file hidden without pywin32 library')
1222
 
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1223
 
        self.build_tree(['a'])
1224
 
        self.assertEquals(['a'], self.get_ls())
1225
 
 
1226
 
 
1227
 
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1228
 
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
1229
 
 
1230
 
    def _open(self, transport):
1231
 
        return _TestBzrDir(transport, self)
1232
 
 
1233
 
 
1234
 
class _TestBzrDir(bzrdir.BzrDirMeta1):
1235
 
    """Test BzrDir implementation for TestBzrDirSprout.
1236
 
 
1237
 
    When created a _TestBzrDir already has repository and a branch.  The branch
1238
 
    is a test double as well.
1239
 
    """
1240
 
 
1241
 
    def __init__(self, *args, **kwargs):
1242
 
        super(_TestBzrDir, self).__init__(*args, **kwargs)
1243
 
        self.test_branch = _TestBranch()
1244
 
        self.test_branch.repository = self.create_repository()
1245
 
 
1246
 
    def open_branch(self, unsupported=False):
1247
 
        return self.test_branch
1248
 
 
1249
 
    def cloning_metadir(self, require_stacking=False):
1250
 
        return _TestBzrDirFormat()
1251
 
 
1252
 
 
1253
 
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1254
 
    """Test Branch format for TestBzrDirSprout."""
1255
 
 
1256
 
 
1257
 
class _TestBranch(bzrlib.branch.Branch):
1258
 
    """Test Branch implementation for TestBzrDirSprout."""
1259
 
 
1260
 
    def __init__(self, *args, **kwargs):
1261
 
        self._format = _TestBranchFormat()
1262
 
        super(_TestBranch, self).__init__(*args, **kwargs)
1263
 
        self.calls = []
1264
 
        self._parent = None
1265
 
 
1266
 
    def sprout(self, *args, **kwargs):
1267
 
        self.calls.append('sprout')
1268
 
        return _TestBranch()
1269
 
 
1270
 
    def copy_content_into(self, destination, revision_id=None):
1271
 
        self.calls.append('copy_content_into')
1272
 
 
1273
 
    def get_parent(self):
1274
 
        return self._parent
1275
 
 
1276
 
    def set_parent(self, parent):
1277
 
        self._parent = parent
1278
 
 
1279
 
 
1280
 
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1281
 
 
1282
 
    def test_sprout_uses_branch_sprout(self):
1283
 
        """BzrDir.sprout calls Branch.sprout.
1284
 
 
1285
 
        Usually, BzrDir.sprout should delegate to the branch's sprout method
1286
 
        for part of the work.  This allows the source branch to control the
1287
 
        choice of format for the new branch.
1288
 
 
1289
 
        There are exceptions, but this tests avoids them:
1290
 
          - if there's no branch in the source bzrdir,
1291
 
          - or if the stacking has been requested and the format needs to be
1292
 
            overridden to satisfy that.
1293
 
        """
1294
 
        # Make an instrumented bzrdir.
1295
 
        t = self.get_transport('source')
1296
 
        t.ensure_base()
1297
 
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1298
 
        # The instrumented bzrdir has a test_branch attribute that logs calls
1299
 
        # made to the branch contained in that bzrdir.  Initially the test
1300
 
        # branch exists but no calls have been made to it.
1301
 
        self.assertEqual([], source_bzrdir.test_branch.calls)
1302
 
 
1303
 
        # Sprout the bzrdir
1304
 
        target_url = self.get_url('target')
1305
 
        result = source_bzrdir.sprout(target_url, recurse='no')
1306
 
 
1307
 
        # The bzrdir called the branch's sprout method.
1308
 
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1309
 
 
1310
 
    def test_sprout_parent(self):
1311
 
        grandparent_tree = self.make_branch('grandparent')
1312
 
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1313
 
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
1314
 
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1315
 
 
1316
 
 
1317
 
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1318
 
 
1319
 
    def test_pre_open_called(self):
1320
 
        calls = []
1321
 
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1322
 
        transport = self.get_transport('foo')
1323
 
        url = transport.base
1324
 
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1325
 
        self.assertEqual([transport.base], [t.base for t in calls])
1326
 
 
1327
 
    def test_pre_open_actual_exceptions_raised(self):
1328
 
        count = [0]
1329
 
        def fail_once(transport):
1330
 
            count[0] += 1
1331
 
            if count[0] == 1:
1332
 
                raise errors.BzrError("fail")
1333
 
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1334
 
        transport = self.get_transport('foo')
1335
 
        url = transport.base
1336
 
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1337
 
        self.assertEqual('fail', err._preformatted_string)
 
845
    _qualifier = 'pycurl'