~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Robert Collins
  • Date: 2007-07-12 12:07:13 UTC
  • mto: This revision was merged to the branch mainline in revision 2610.
  • Revision ID: robertc@robertcollins.net-20070712120713-c2vfm2n61l63wt6s
(robertc) Introduce a pack command.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
 
#
 
1
    # Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for the BzrDir facility and any format specific tests.
18
18
 
19
 
For interface contract tests, see tests/per_bzr_dir.
 
19
For interface contract tests, see tests/bzr_dir_implementations.
20
20
"""
21
21
 
22
 
import os
23
 
import subprocess
24
 
import sys
 
22
import os.path
 
23
from StringIO import StringIO
25
24
 
26
25
from bzrlib import (
27
26
    bzrdir,
28
27
    errors,
29
28
    help_topics,
30
29
    repository,
31
 
    osutils,
32
 
    remote,
 
30
    symbol_versioning,
33
31
    urlutils,
34
 
    win32utils,
35
32
    workingtree,
36
33
    )
37
34
import bzrlib.branch
41
38
                           )
42
39
from bzrlib.tests import (
43
40
    TestCase,
44
 
    TestCaseWithMemoryTransport,
45
41
    TestCaseWithTransport,
46
 
    TestSkipped,
 
42
    test_sftp_transport
47
43
    )
48
 
from bzrlib.tests import(
49
 
    http_server,
50
 
    http_utils,
 
44
from bzrlib.tests.HttpServer import HttpServer
 
45
from bzrlib.tests.HTTPTestUtil import (
 
46
    TestCaseWithTwoWebservers,
 
47
    HTTPServerRedirecting,
51
48
    )
52
49
from bzrlib.tests.test_http import TestWithTransport_pycurl
53
50
from bzrlib.transport import get_transport
54
51
from bzrlib.transport.http._urllib import HttpTransport_urllib
55
52
from bzrlib.transport.memory import MemoryServer
56
 
from bzrlib.transport.nosmart import NoSmartTransportDecorator
57
 
from bzrlib.transport.readonly import ReadonlyTransportDecorator
58
 
from bzrlib.repofmt import knitrepo, weaverepo, pack_repo
 
53
from bzrlib.repofmt import knitrepo, weaverepo
59
54
 
60
55
 
61
56
class TestDefaultFormat(TestCase):
64
59
        old_format = bzrdir.BzrDirFormat.get_default_format()
65
60
        # default is BzrDirFormat6
66
61
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
67
 
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
62
        self.applyDeprecated(symbol_versioning.zero_fourteen, 
 
63
                             bzrdir.BzrDirFormat.set_default_format, 
 
64
                             SampleBzrDirFormat())
68
65
        # creating a bzr dir should now create an instrumented dir.
69
66
        try:
70
67
            result = bzrdir.BzrDir.create('memory:///')
71
68
            self.failUnless(isinstance(result, SampleBzrDir))
72
69
        finally:
73
 
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
70
            self.applyDeprecated(symbol_versioning.zero_fourteen,
 
71
                bzrdir.BzrDirFormat.set_default_format, old_format)
74
72
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
75
73
 
76
74
 
81
79
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
82
80
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
83
81
            ' repositories', deprecated=True)
84
 
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir',
 
82
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir', 
85
83
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
86
84
        my_format_registry.register_metadir('knit',
87
85
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
92
90
            'branch6',
93
91
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
94
92
            'Experimental successor to knit.  Use at your own risk.',
95
 
            branch_format='bzrlib.branch.BzrBranchFormat6',
96
 
            experimental=True)
 
93
            branch_format='bzrlib.branch.BzrBranchFormat6')
97
94
        my_format_registry.register_metadir(
98
95
            'hidden format',
99
96
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
114
111
        my_bzrdir = my_format_registry.make_bzrdir('weave')
115
112
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
116
113
        my_bzrdir = my_format_registry.make_bzrdir('default')
117
 
        self.assertIsInstance(my_bzrdir.repository_format,
 
114
        self.assertIsInstance(my_bzrdir.repository_format, 
118
115
            knitrepo.RepositoryFormatKnit1)
119
116
        my_bzrdir = my_format_registry.make_bzrdir('knit')
120
 
        self.assertIsInstance(my_bzrdir.repository_format,
 
117
        self.assertIsInstance(my_bzrdir.repository_format, 
121
118
            knitrepo.RepositoryFormatKnit1)
122
119
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
123
120
        self.assertIsInstance(my_bzrdir.get_branch_format(),
127
124
        my_format_registry = self.make_format_registry()
128
125
        self.assertEqual('Format registered lazily',
129
126
                         my_format_registry.get_help('lazy'))
130
 
        self.assertEqual('Format using knits',
 
127
        self.assertEqual('Format using knits', 
131
128
                         my_format_registry.get_help('knit'))
132
 
        self.assertEqual('Format using knits',
 
129
        self.assertEqual('Format using knits', 
133
130
                         my_format_registry.get_help('default'))
134
131
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
135
 
                         ' checkouts or shared repositories',
 
132
                         ' checkouts or shared repositories', 
136
133
                         my_format_registry.get_help('weave'))
137
 
 
 
134
        
138
135
    def test_help_topic(self):
139
136
        topics = help_topics.HelpTopicRegistry()
140
 
        registry = self.make_format_registry()
141
 
        topics.register('current-formats', registry.help_topic,
142
 
                        'Current formats')
143
 
        topics.register('other-formats', registry.help_topic,
144
 
                        'Other formats')
145
 
        new = topics.get_detail('current-formats')
146
 
        rest = topics.get_detail('other-formats')
147
 
        experimental, deprecated = rest.split('Deprecated formats')
148
 
        self.assertContainsRe(new, 'formats-help')
149
 
        self.assertContainsRe(new,
150
 
                ':knit:\n    \(native\) \(default\) Format using knits\n')
151
 
        self.assertContainsRe(experimental,
152
 
                ':branch6:\n    \(native\) Experimental successor to knit')
153
 
        self.assertContainsRe(deprecated,
154
 
                ':lazy:\n    \(native\) Format registered lazily\n')
 
137
        topics.register('formats', self.make_format_registry().help_topic, 
 
138
                        'Directory formats')
 
139
        topic = topics.get_detail('formats')
 
140
        new, deprecated = topic.split('Deprecated formats')
 
141
        self.assertContainsRe(new, 'Bazaar directory formats')
 
142
        self.assertContainsRe(new, 
 
143
            '  knit/default:\n    \(native\) Format using knits\n')
 
144
        self.assertContainsRe(deprecated, 
 
145
            '  lazy:\n    \(native\) Format registered lazily\n')
155
146
        self.assertNotContainsRe(new, 'hidden')
156
147
 
157
148
    def test_set_default_repository(self):
168
159
        finally:
169
160
            bzrdir.format_registry.set_default_repository(old_default)
170
161
 
171
 
    def test_aliases(self):
172
 
        a_registry = bzrdir.BzrDirFormatRegistry()
173
 
        a_registry.register('weave', bzrdir.BzrDirFormat6,
174
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
175
 
            ' repositories', deprecated=True)
176
 
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
177
 
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
178
 
            ' repositories', deprecated=True, alias=True)
179
 
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
180
 
 
181
162
 
182
163
class SampleBranch(bzrlib.branch.Branch):
183
164
    """A dummy branch for guess what, dummy use."""
186
167
        self.bzrdir = dir
187
168
 
188
169
 
189
 
class SampleRepository(bzrlib.repository.Repository):
190
 
    """A dummy repo."""
191
 
 
192
 
    def __init__(self, dir):
193
 
        self.bzrdir = dir
194
 
 
195
 
 
196
170
class SampleBzrDir(bzrdir.BzrDir):
197
171
    """A sample BzrDir implementation to allow testing static methods."""
198
172
 
202
176
 
203
177
    def open_repository(self):
204
178
        """See BzrDir.open_repository."""
205
 
        return SampleRepository(self)
 
179
        return "A repository"
206
180
 
207
181
    def create_branch(self):
208
182
        """See BzrDir.create_branch."""
216
190
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
217
191
    """A sample format
218
192
 
219
 
    this format is initializable, unsupported to aid in testing the
 
193
    this format is initializable, unsupported to aid in testing the 
220
194
    open and open_downlevel routines.
221
195
    """
222
196
 
224
198
        """See BzrDirFormat.get_format_string()."""
225
199
        return "Sample .bzr dir format."
226
200
 
227
 
    def initialize_on_transport(self, t):
 
201
    def initialize(self, url):
228
202
        """Create a bzr dir."""
 
203
        t = get_transport(url)
229
204
        t.mkdir('.bzr')
230
205
        t.put_bytes('.bzr/branch-format', self.get_format_string())
231
206
        return SampleBzrDir(t, self)
243
218
    def test_find_format(self):
244
219
        # is the right format object found for a branch?
245
220
        # create a branch with a few known format objects.
246
 
        # this is not quite the same as
 
221
        # this is not quite the same as 
247
222
        t = get_transport(self.get_url())
248
223
        self.build_tree(["foo/", "bar/"], transport=t)
249
224
        def check_format(format, url):
253
228
            self.failUnless(isinstance(found_format, format.__class__))
254
229
        check_format(bzrdir.BzrDirFormat5(), "foo")
255
230
        check_format(bzrdir.BzrDirFormat6(), "bar")
256
 
 
 
231
        
257
232
    def test_find_format_nothing_there(self):
258
233
        self.assertRaises(NotBranchError,
259
234
                          bzrdir.BzrDirFormat.find_format,
286
261
        # now open_downlevel should fail too.
287
262
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
288
263
 
 
264
    def test_create_repository(self):
 
265
        format = SampleBzrDirFormat()
 
266
        repo = bzrdir.BzrDir.create_repository(self.get_url(), format=format)
 
267
        self.assertEqual('A repository', repo)
 
268
 
 
269
    def test_create_repository_shared(self):
 
270
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
271
        repo = bzrdir.BzrDir.create_repository('.', shared=True)
 
272
        self.assertTrue(repo.is_shared())
 
273
 
 
274
    def test_create_repository_nonshared(self):
 
275
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
276
        repo = bzrdir.BzrDir.create_repository('.')
 
277
        self.assertFalse(repo.is_shared())
 
278
 
 
279
    def test_create_repository_under_shared(self):
 
280
        # an explicit create_repository always does so.
 
281
        # we trust the format is right from the 'create_repository test'
 
282
        format = bzrdir.format_registry.make_bzrdir('knit')
 
283
        self.make_repository('.', shared=True, format=format)
 
284
        repo = bzrdir.BzrDir.create_repository(self.get_url('child'),
 
285
                                               format=format)
 
286
        self.assertTrue(isinstance(repo, repository.Repository))
 
287
        self.assertTrue(repo.bzrdir.root_transport.base.endswith('child/'))
 
288
 
289
289
    def test_create_branch_and_repo_uses_default(self):
290
290
        format = SampleBzrDirFormat()
291
 
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
291
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(), 
292
292
                                                      format=format)
293
293
        self.assertTrue(isinstance(branch, SampleBranch))
294
294
 
303
303
                          branch.bzrdir.open_repository)
304
304
 
305
305
    def test_create_branch_and_repo_under_shared_force_new(self):
306
 
        # creating a branch and repo in a shared repo can be forced to
 
306
        # creating a branch and repo in a shared repo can be forced to 
307
307
        # make a new repo
308
308
        format = bzrdir.format_registry.make_bzrdir('knit')
309
309
        self.make_repository('.', shared=True, format=format)
314
314
 
315
315
    def test_create_standalone_working_tree(self):
316
316
        format = SampleBzrDirFormat()
317
 
        # note this is deliberately readonly, as this failure should
 
317
        # note this is deliberately readonly, as this failure should 
318
318
        # occur before any writes.
319
319
        self.assertRaises(errors.NotLocalUrl,
320
320
                          bzrdir.BzrDir.create_standalone_workingtree,
321
321
                          self.get_readonly_url(), format=format)
322
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('.',
 
322
        tree = bzrdir.BzrDir.create_standalone_workingtree('.', 
323
323
                                                           format=format)
324
324
        self.assertEqual('A tree', tree)
325
325
 
327
327
        # create standalone working tree always makes a repo.
328
328
        format = bzrdir.format_registry.make_bzrdir('knit')
329
329
        self.make_repository('.', shared=True, format=format)
330
 
        # note this is deliberately readonly, as this failure should
 
330
        # note this is deliberately readonly, as this failure should 
331
331
        # occur before any writes.
332
332
        self.assertRaises(errors.NotLocalUrl,
333
333
                          bzrdir.BzrDir.create_standalone_workingtree,
334
334
                          self.get_readonly_url('child'), format=format)
335
 
        tree = bzrdir.BzrDir.create_standalone_workingtree('child',
 
335
        tree = bzrdir.BzrDir.create_standalone_workingtree('child', 
336
336
            format=format)
337
337
        tree.bzrdir.open_repository()
338
338
 
343
343
        branch.bzrdir.open_workingtree()
344
344
        branch.bzrdir.open_repository()
345
345
 
346
 
    def test_create_branch_convenience_possible_transports(self):
347
 
        """Check that the optional 'possible_transports' is recognized"""
348
 
        format = bzrdir.format_registry.make_bzrdir('knit')
349
 
        t = self.get_transport()
350
 
        branch = bzrdir.BzrDir.create_branch_convenience(
351
 
            '.', format=format, possible_transports=[t])
352
 
        branch.bzrdir.open_workingtree()
353
 
        branch.bzrdir.open_repository()
354
 
 
355
346
    def test_create_branch_convenience_root(self):
356
347
        """Creating a branch at the root of a fs should work."""
357
348
        self.vfs_transport_factory = MemoryServer
358
349
        # outside a repo the default convenience output is a repo+branch_tree
359
350
        format = bzrdir.format_registry.make_bzrdir('knit')
360
 
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(),
 
351
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(), 
361
352
                                                         format=format)
362
353
        self.assertRaises(errors.NoWorkingTree,
363
354
                          branch.bzrdir.open_workingtree)
373
364
        branch.bzrdir.open_workingtree()
374
365
        self.assertRaises(errors.NoRepositoryPresent,
375
366
                          branch.bzrdir.open_repository)
376
 
 
 
367
            
377
368
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
378
369
        # inside a repo the default convenience output is a branch+ follow the
379
370
        # repo tree policy but we can override that
385
376
                          branch.bzrdir.open_workingtree)
386
377
        self.assertRaises(errors.NoRepositoryPresent,
387
378
                          branch.bzrdir.open_repository)
388
 
 
 
379
            
389
380
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
390
381
        # inside a repo the default convenience output is a branch+ follow the
391
382
        # repo tree policy
392
383
        format = bzrdir.format_registry.make_bzrdir('knit')
393
384
        repo = self.make_repository('.', shared=True, format=format)
394
385
        repo.set_make_working_trees(False)
395
 
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
386
        branch = bzrdir.BzrDir.create_branch_convenience('child', 
396
387
                                                         format=format)
397
388
        self.assertRaises(errors.NoWorkingTree,
398
389
                          branch.bzrdir.open_workingtree)
422
413
        branch.bzrdir.open_workingtree()
423
414
 
424
415
 
425
 
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
426
 
 
427
 
    def test_acquire_repository_standalone(self):
428
 
        """The default acquisition policy should create a standalone branch."""
429
 
        my_bzrdir = self.make_bzrdir('.')
430
 
        repo_policy = my_bzrdir.determine_repository_policy()
431
 
        repo, is_new = repo_policy.acquire_repository()
432
 
        self.assertEqual(repo.bzrdir.root_transport.base,
433
 
                         my_bzrdir.root_transport.base)
434
 
        self.assertFalse(repo.is_shared())
435
 
 
436
 
    def test_determine_stacking_policy(self):
437
 
        parent_bzrdir = self.make_bzrdir('.')
438
 
        child_bzrdir = self.make_bzrdir('child')
439
 
        parent_bzrdir.get_config().set_default_stack_on('http://example.org')
440
 
        repo_policy = child_bzrdir.determine_repository_policy()
441
 
        self.assertEqual('http://example.org', repo_policy._stack_on)
442
 
 
443
 
    def test_determine_stacking_policy_relative(self):
444
 
        parent_bzrdir = self.make_bzrdir('.')
445
 
        child_bzrdir = self.make_bzrdir('child')
446
 
        parent_bzrdir.get_config().set_default_stack_on('child2')
447
 
        repo_policy = child_bzrdir.determine_repository_policy()
448
 
        self.assertEqual('child2', repo_policy._stack_on)
449
 
        self.assertEqual(parent_bzrdir.root_transport.base,
450
 
                         repo_policy._stack_on_pwd)
451
 
 
452
 
    def prepare_default_stacking(self, child_format='1.6'):
453
 
        parent_bzrdir = self.make_bzrdir('.')
454
 
        child_branch = self.make_branch('child', format=child_format)
455
 
        parent_bzrdir.get_config().set_default_stack_on(child_branch.base)
456
 
        new_child_transport = parent_bzrdir.transport.clone('child2')
457
 
        return child_branch, new_child_transport
458
 
 
459
 
    def test_clone_on_transport_obeys_stacking_policy(self):
460
 
        child_branch, new_child_transport = self.prepare_default_stacking()
461
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
462
 
        self.assertEqual(child_branch.base,
463
 
                         new_child.open_branch().get_stacked_on_url())
464
 
 
465
 
    def test_default_stacking_with_stackable_branch_unstackable_repo(self):
466
 
        # Make stackable source branch with an unstackable repo format.
467
 
        source_bzrdir = self.make_bzrdir('source')
468
 
        pack_repo.RepositoryFormatKnitPack1().initialize(source_bzrdir)
469
 
        source_branch = bzrlib.branch.BzrBranchFormat7().initialize(source_bzrdir)
470
 
        # Make a directory with a default stacking policy
471
 
        parent_bzrdir = self.make_bzrdir('parent')
472
 
        stacked_on = self.make_branch('parent/stacked-on', format='pack-0.92')
473
 
        parent_bzrdir.get_config().set_default_stack_on(stacked_on.base)
474
 
        # Clone source into directory
475
 
        target = source_bzrdir.clone(self.get_url('parent/target'))
476
 
 
477
 
    def test_sprout_obeys_stacking_policy(self):
478
 
        child_branch, new_child_transport = self.prepare_default_stacking()
479
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
480
 
        self.assertEqual(child_branch.base,
481
 
                         new_child.open_branch().get_stacked_on_url())
482
 
 
483
 
    def test_clone_ignores_policy_for_unsupported_formats(self):
484
 
        child_branch, new_child_transport = self.prepare_default_stacking(
485
 
            child_format='pack-0.92')
486
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport)
487
 
        self.assertRaises(errors.UnstackableBranchFormat,
488
 
                          new_child.open_branch().get_stacked_on_url)
489
 
 
490
 
    def test_sprout_ignores_policy_for_unsupported_formats(self):
491
 
        child_branch, new_child_transport = self.prepare_default_stacking(
492
 
            child_format='pack-0.92')
493
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base)
494
 
        self.assertRaises(errors.UnstackableBranchFormat,
495
 
                          new_child.open_branch().get_stacked_on_url)
496
 
 
497
 
    def test_sprout_upgrades_format_if_stacked_specified(self):
498
 
        child_branch, new_child_transport = self.prepare_default_stacking(
499
 
            child_format='pack-0.92')
500
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
501
 
                                               stacked=True)
502
 
        self.assertEqual(child_branch.bzrdir.root_transport.base,
503
 
                         new_child.open_branch().get_stacked_on_url())
504
 
        repo = new_child.open_repository()
505
 
        self.assertTrue(repo._format.supports_external_lookups)
506
 
        self.assertFalse(repo.supports_rich_root())
507
 
 
508
 
    def test_clone_on_transport_upgrades_format_if_stacked_on_specified(self):
509
 
        child_branch, new_child_transport = self.prepare_default_stacking(
510
 
            child_format='pack-0.92')
511
 
        new_child = child_branch.bzrdir.clone_on_transport(new_child_transport,
512
 
            stacked_on=child_branch.bzrdir.root_transport.base)
513
 
        self.assertEqual(child_branch.bzrdir.root_transport.base,
514
 
                         new_child.open_branch().get_stacked_on_url())
515
 
        repo = new_child.open_repository()
516
 
        self.assertTrue(repo._format.supports_external_lookups)
517
 
        self.assertFalse(repo.supports_rich_root())
518
 
 
519
 
    def test_sprout_upgrades_to_rich_root_format_if_needed(self):
520
 
        child_branch, new_child_transport = self.prepare_default_stacking(
521
 
            child_format='rich-root-pack')
522
 
        new_child = child_branch.bzrdir.sprout(new_child_transport.base,
523
 
                                               stacked=True)
524
 
        repo = new_child.open_repository()
525
 
        self.assertTrue(repo._format.supports_external_lookups)
526
 
        self.assertTrue(repo.supports_rich_root())
527
 
 
528
 
    def test_add_fallback_repo_handles_absolute_urls(self):
529
 
        stack_on = self.make_branch('stack_on', format='1.6')
530
 
        repo = self.make_repository('repo', format='1.6')
531
 
        policy = bzrdir.UseExistingRepository(repo, stack_on.base)
532
 
        policy._add_fallback(repo)
533
 
 
534
 
    def test_add_fallback_repo_handles_relative_urls(self):
535
 
        stack_on = self.make_branch('stack_on', format='1.6')
536
 
        repo = self.make_repository('repo', format='1.6')
537
 
        policy = bzrdir.UseExistingRepository(repo, '.', stack_on.base)
538
 
        policy._add_fallback(repo)
539
 
 
540
 
    def test_configure_relative_branch_stacking_url(self):
541
 
        stack_on = self.make_branch('stack_on', format='1.6')
542
 
        stacked = self.make_branch('stack_on/stacked', format='1.6')
543
 
        policy = bzrdir.UseExistingRepository(stacked.repository,
544
 
            '.', stack_on.base)
545
 
        policy.configure_branch(stacked)
546
 
        self.assertEqual('..', stacked.get_stacked_on_url())
547
 
 
548
 
    def test_relative_branch_stacking_to_absolute(self):
549
 
        stack_on = self.make_branch('stack_on', format='1.6')
550
 
        stacked = self.make_branch('stack_on/stacked', format='1.6')
551
 
        policy = bzrdir.UseExistingRepository(stacked.repository,
552
 
            '.', self.get_readonly_url('stack_on'))
553
 
        policy.configure_branch(stacked)
554
 
        self.assertEqual(self.get_readonly_url('stack_on'),
555
 
                         stacked.get_stacked_on_url())
556
 
 
557
 
 
558
416
class ChrootedTests(TestCaseWithTransport):
559
417
    """A support class that provides readonly urls outside the local namespace.
560
418
 
566
424
    def setUp(self):
567
425
        super(ChrootedTests, self).setUp()
568
426
        if not self.vfs_transport_factory == MemoryServer:
569
 
            self.transport_readonly_server = http_server.HttpServer
570
 
 
571
 
    def local_branch_path(self, branch):
572
 
         return os.path.realpath(urlutils.local_path_from_url(branch.base))
 
427
            self.transport_readonly_server = HttpServer
573
428
 
574
429
    def test_open_containing(self):
575
430
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
582
437
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
583
438
        self.assertEqual('g/p/q', relpath)
584
439
 
585
 
    def test_open_containing_tree_branch_or_repository_empty(self):
586
 
        self.assertRaises(errors.NotBranchError,
587
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository,
588
 
            self.get_readonly_url(''))
589
 
 
590
 
    def test_open_containing_tree_branch_or_repository_all(self):
591
 
        self.make_branch_and_tree('topdir')
592
 
        tree, branch, repo, relpath = \
593
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
594
 
                'topdir/foo')
595
 
        self.assertEqual(os.path.realpath('topdir'),
596
 
                         os.path.realpath(tree.basedir))
597
 
        self.assertEqual(os.path.realpath('topdir'),
598
 
                         self.local_branch_path(branch))
599
 
        self.assertEqual(
600
 
            osutils.realpath(os.path.join('topdir', '.bzr', 'repository')),
601
 
            repo.bzrdir.transport.local_abspath('repository'))
602
 
        self.assertEqual(relpath, 'foo')
603
 
 
604
 
    def test_open_containing_tree_branch_or_repository_no_tree(self):
605
 
        self.make_branch('branch')
606
 
        tree, branch, repo, relpath = \
607
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
608
 
                'branch/foo')
609
 
        self.assertEqual(tree, None)
610
 
        self.assertEqual(os.path.realpath('branch'),
611
 
                         self.local_branch_path(branch))
612
 
        self.assertEqual(
613
 
            osutils.realpath(os.path.join('branch', '.bzr', 'repository')),
614
 
            repo.bzrdir.transport.local_abspath('repository'))
615
 
        self.assertEqual(relpath, 'foo')
616
 
 
617
 
    def test_open_containing_tree_branch_or_repository_repo(self):
618
 
        self.make_repository('repo')
619
 
        tree, branch, repo, relpath = \
620
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
621
 
                'repo')
622
 
        self.assertEqual(tree, None)
623
 
        self.assertEqual(branch, None)
624
 
        self.assertEqual(
625
 
            osutils.realpath(os.path.join('repo', '.bzr', 'repository')),
626
 
            repo.bzrdir.transport.local_abspath('repository'))
627
 
        self.assertEqual(relpath, '')
628
 
 
629
 
    def test_open_containing_tree_branch_or_repository_shared_repo(self):
630
 
        self.make_repository('shared', shared=True)
631
 
        bzrdir.BzrDir.create_branch_convenience('shared/branch',
632
 
                                                force_new_tree=False)
633
 
        tree, branch, repo, relpath = \
634
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
635
 
                'shared/branch')
636
 
        self.assertEqual(tree, None)
637
 
        self.assertEqual(os.path.realpath('shared/branch'),
638
 
                         self.local_branch_path(branch))
639
 
        self.assertEqual(
640
 
            osutils.realpath(os.path.join('shared', '.bzr', 'repository')),
641
 
            repo.bzrdir.transport.local_abspath('repository'))
642
 
        self.assertEqual(relpath, '')
643
 
 
644
 
    def test_open_containing_tree_branch_or_repository_branch_subdir(self):
645
 
        self.make_branch_and_tree('foo')
646
 
        self.build_tree(['foo/bar/'])
647
 
        tree, branch, repo, relpath = \
648
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
649
 
                'foo/bar')
650
 
        self.assertEqual(os.path.realpath('foo'),
651
 
                         os.path.realpath(tree.basedir))
652
 
        self.assertEqual(os.path.realpath('foo'),
653
 
                         self.local_branch_path(branch))
654
 
        self.assertEqual(
655
 
            osutils.realpath(os.path.join('foo', '.bzr', 'repository')),
656
 
            repo.bzrdir.transport.local_abspath('repository'))
657
 
        self.assertEqual(relpath, 'bar')
658
 
 
659
 
    def test_open_containing_tree_branch_or_repository_repo_subdir(self):
660
 
        self.make_repository('bar')
661
 
        self.build_tree(['bar/baz/'])
662
 
        tree, branch, repo, relpath = \
663
 
            bzrdir.BzrDir.open_containing_tree_branch_or_repository(
664
 
                'bar/baz')
665
 
        self.assertEqual(tree, None)
666
 
        self.assertEqual(branch, None)
667
 
        self.assertEqual(
668
 
            osutils.realpath(os.path.join('bar', '.bzr', 'repository')),
669
 
            repo.bzrdir.transport.local_abspath('repository'))
670
 
        self.assertEqual(relpath, 'baz')
671
 
 
672
440
    def test_open_containing_from_transport(self):
673
441
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
674
442
                          get_transport(self.get_readonly_url('')))
683
451
        self.assertEqual('g/p/q', relpath)
684
452
 
685
453
    def test_open_containing_tree_or_branch(self):
 
454
        def local_branch_path(branch):
 
455
             return os.path.realpath(
 
456
                urlutils.local_path_from_url(branch.base))
 
457
 
686
458
        self.make_branch_and_tree('topdir')
687
459
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
688
460
            'topdir/foo')
689
461
        self.assertEqual(os.path.realpath('topdir'),
690
462
                         os.path.realpath(tree.basedir))
691
463
        self.assertEqual(os.path.realpath('topdir'),
692
 
                         self.local_branch_path(branch))
 
464
                         local_branch_path(branch))
693
465
        self.assertIs(tree.bzrdir, branch.bzrdir)
694
466
        self.assertEqual('foo', relpath)
695
467
        # opening from non-local should not return the tree
703
475
            'topdir/foo')
704
476
        self.assertIs(tree, None)
705
477
        self.assertEqual(os.path.realpath('topdir/foo'),
706
 
                         self.local_branch_path(branch))
 
478
                         local_branch_path(branch))
707
479
        self.assertEqual('', relpath)
708
480
 
709
 
    def test_open_tree_or_branch(self):
710
 
        self.make_branch_and_tree('topdir')
711
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
712
 
        self.assertEqual(os.path.realpath('topdir'),
713
 
                         os.path.realpath(tree.basedir))
714
 
        self.assertEqual(os.path.realpath('topdir'),
715
 
                         self.local_branch_path(branch))
716
 
        self.assertIs(tree.bzrdir, branch.bzrdir)
717
 
        # opening from non-local should not return the tree
718
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
719
 
            self.get_readonly_url('topdir'))
720
 
        self.assertEqual(None, tree)
721
 
        # without a tree:
722
 
        self.make_branch('topdir/foo')
723
 
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
724
 
        self.assertIs(tree, None)
725
 
        self.assertEqual(os.path.realpath('topdir/foo'),
726
 
                         self.local_branch_path(branch))
727
 
 
728
481
    def test_open_from_transport(self):
729
482
        # transport pointing at bzrdir should give a bzrdir with root transport
730
483
        # set to the given transport
733
486
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
734
487
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
735
488
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
736
 
 
 
489
        
737
490
    def test_open_from_transport_no_bzrdir(self):
738
491
        transport = get_transport(self.get_url())
739
492
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
748
501
                          transport)
749
502
 
750
503
    def test_sprout_recursive(self):
751
 
        tree = self.make_branch_and_tree('tree1',
752
 
                                         format='dirstate-with-subtree')
 
504
        tree = self.make_branch_and_tree('tree1', format='dirstate-with-subtree')
753
505
        sub_tree = self.make_branch_and_tree('tree1/subtree',
754
506
            format='dirstate-with-subtree')
755
 
        sub_tree.set_root_id('subtree-root')
756
507
        tree.add_reference(sub_tree)
757
508
        self.build_tree(['tree1/subtree/file'])
758
509
        sub_tree.add('file')
759
510
        tree.commit('Initial commit')
760
 
        tree2 = tree.bzrdir.sprout('tree2').open_workingtree()
761
 
        tree2.lock_read()
762
 
        self.addCleanup(tree2.unlock)
 
511
        tree.bzrdir.sprout('tree2')
763
512
        self.failUnlessExists('tree2/subtree/file')
764
 
        self.assertEqual('tree-reference', tree2.kind('subtree-root'))
765
513
 
766
514
    def test_cloning_metadir(self):
767
515
        """Ensure that cloning metadir is suitable"""
789
537
        self.failUnlessExists('repo/tree2/subtree')
790
538
        self.failIfExists('repo/tree2/subtree/file')
791
539
 
792
 
    def make_foo_bar_baz(self):
793
 
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
794
 
        bar = self.make_branch('foo/bar').bzrdir
795
 
        baz = self.make_branch('baz').bzrdir
796
 
        return foo, bar, baz
797
 
 
798
 
    def test_find_bzrdirs(self):
799
 
        foo, bar, baz = self.make_foo_bar_baz()
800
 
        transport = get_transport(self.get_url())
801
 
        self.assertEqualBzrdirs([baz, foo, bar],
802
 
                                bzrdir.BzrDir.find_bzrdirs(transport))
803
 
 
804
 
    def test_find_bzrdirs_list_current(self):
805
 
        def list_current(transport):
806
 
            return [s for s in transport.list_dir('') if s != 'baz']
807
 
 
808
 
        foo, bar, baz = self.make_foo_bar_baz()
809
 
        transport = get_transport(self.get_url())
810
 
        self.assertEqualBzrdirs([foo, bar],
811
 
                                bzrdir.BzrDir.find_bzrdirs(transport,
812
 
                                    list_current=list_current))
813
 
 
814
 
 
815
 
    def test_find_bzrdirs_evaluate(self):
816
 
        def evaluate(bzrdir):
817
 
            try:
818
 
                repo = bzrdir.open_repository()
819
 
            except NoRepositoryPresent:
820
 
                return True, bzrdir.root_transport.base
821
 
            else:
822
 
                return False, bzrdir.root_transport.base
823
 
 
824
 
        foo, bar, baz = self.make_foo_bar_baz()
825
 
        transport = get_transport(self.get_url())
826
 
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
827
 
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
828
 
                                                         evaluate=evaluate)))
829
 
 
830
 
    def assertEqualBzrdirs(self, first, second):
831
 
        first = list(first)
832
 
        second = list(second)
833
 
        self.assertEqual(len(first), len(second))
834
 
        for x, y in zip(first, second):
835
 
            self.assertEqual(x.root_transport.base, y.root_transport.base)
836
 
 
837
 
    def test_find_branches(self):
838
 
        root = self.make_repository('', shared=True)
839
 
        foo, bar, baz = self.make_foo_bar_baz()
840
 
        qux = self.make_bzrdir('foo/qux')
841
 
        transport = get_transport(self.get_url())
842
 
        branches = bzrdir.BzrDir.find_branches(transport)
843
 
        self.assertEqual(baz.root_transport.base, branches[0].base)
844
 
        self.assertEqual(foo.root_transport.base, branches[1].base)
845
 
        self.assertEqual(bar.root_transport.base, branches[2].base)
846
 
 
847
 
        # ensure this works without a top-level repo
848
 
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
849
 
        self.assertEqual(foo.root_transport.base, branches[0].base)
850
 
        self.assertEqual(bar.root_transport.base, branches[1].base)
851
 
 
852
540
 
853
541
class TestMeta1DirFormat(TestCaseWithTransport):
854
542
    """Tests specific to the meta1 dir format."""
893
581
 
894
582
    def test_needs_conversion_different_working_tree(self):
895
583
        # meta1dirs need an conversion if any element is not the default.
896
 
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
897
 
        tree = self.make_branch_and_tree('tree', format='knit')
898
 
        self.assertTrue(tree.bzrdir.needs_format_conversion(
899
 
            new_format))
900
 
 
901
 
    def test_initialize_on_format_uses_smart_transport(self):
902
 
        self.setup_smart_server_with_call_log()
903
 
        new_format = bzrdir.format_registry.make_bzrdir('dirstate')
904
 
        transport = self.get_transport('target')
905
 
        transport.ensure_base()
906
 
        self.reset_smart_call_log()
907
 
        instance = new_format.initialize_on_transport(transport)
908
 
        self.assertIsInstance(instance, remote.RemoteBzrDir)
909
 
        rpc_count = len(self.hpss_calls)
910
 
        # This figure represent the amount of work to perform this use case. It
911
 
        # is entirely ok to reduce this number if a test fails due to rpc_count
912
 
        # being too low. If rpc_count increases, more network roundtrips have
913
 
        # become necessary for this use case. Please do not adjust this number
914
 
        # upwards without agreement from bzr's network support maintainers.
915
 
        self.assertEqual(2, rpc_count)
 
584
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
585
        # test with 
 
586
        new_default = bzrdir.format_registry.make_bzrdir('dirstate')
 
587
        bzrdir.BzrDirFormat._set_default_format(new_default)
 
588
        try:
 
589
            tree = self.make_branch_and_tree('tree', format='knit')
 
590
            self.assertTrue(tree.bzrdir.needs_format_conversion())
 
591
        finally:
 
592
            bzrdir.BzrDirFormat._set_default_format(old_format)
916
593
 
917
594
 
918
595
class TestFormat5(TestCaseWithTransport):
919
596
    """Tests specific to the version 5 bzrdir format."""
920
597
 
921
598
    def test_same_lockfiles_between_tree_repo_branch(self):
922
 
        # this checks that only a single lockfiles instance is created
 
599
        # this checks that only a single lockfiles instance is created 
923
600
        # for format 5 objects
924
601
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
925
602
        def check_dir_components_use_same_lock(dir):
932
609
        # and if we open it normally.
933
610
        dir = bzrdir.BzrDir.open(self.get_url())
934
611
        check_dir_components_use_same_lock(dir)
935
 
 
 
612
    
936
613
    def test_can_convert(self):
937
614
        # format 5 dirs are convertable
938
615
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
939
616
        self.assertTrue(dir.can_convert_format())
940
 
 
 
617
    
941
618
    def test_needs_conversion(self):
942
 
        # format 5 dirs need a conversion if they are not the default,
943
 
        # and they aren't
944
 
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
945
 
        # don't need to convert it to itself
946
 
        self.assertFalse(dir.needs_format_conversion(bzrdir.BzrDirFormat5()))
947
 
        # do need to convert it to the current default
948
 
        self.assertTrue(dir.needs_format_conversion(
949
 
            bzrdir.BzrDirFormat.get_default_format()))
 
619
        # format 5 dirs need a conversion if they are not the default.
 
620
        # and they start of not the default.
 
621
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
622
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
 
623
        try:
 
624
            dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
625
            self.assertFalse(dir.needs_format_conversion())
 
626
        finally:
 
627
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
628
        self.assertTrue(dir.needs_format_conversion())
950
629
 
951
630
 
952
631
class TestFormat6(TestCaseWithTransport):
953
632
    """Tests specific to the version 6 bzrdir format."""
954
633
 
955
634
    def test_same_lockfiles_between_tree_repo_branch(self):
956
 
        # this checks that only a single lockfiles instance is created
 
635
        # this checks that only a single lockfiles instance is created 
957
636
        # for format 6 objects
958
637
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
959
638
        def check_dir_components_use_same_lock(dir):
966
645
        # and if we open it normally.
967
646
        dir = bzrdir.BzrDir.open(self.get_url())
968
647
        check_dir_components_use_same_lock(dir)
969
 
 
 
648
    
970
649
    def test_can_convert(self):
971
650
        # format 6 dirs are convertable
972
651
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
973
652
        self.assertTrue(dir.can_convert_format())
974
 
 
 
653
    
975
654
    def test_needs_conversion(self):
976
655
        # format 6 dirs need an conversion if they are not the default.
977
 
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
978
 
        self.assertTrue(dir.needs_format_conversion(
979
 
            bzrdir.BzrDirFormat.get_default_format()))
 
656
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
657
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirMetaFormat1())
 
658
        try:
 
659
            dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
660
            self.assertTrue(dir.needs_format_conversion())
 
661
        finally:
 
662
            bzrdir.BzrDirFormat._set_default_format(old_format)
980
663
 
981
664
 
982
665
class NotBzrDir(bzrlib.bzrdir.BzrDir):
1013
696
 
1014
697
class TestNotBzrDir(TestCaseWithTransport):
1015
698
    """Tests for using the bzrdir api with a non .bzr based disk format.
1016
 
 
 
699
    
1017
700
    If/when one of these is in the core, we can let the implementation tests
1018
701
    verify this works.
1019
702
    """
1020
703
 
1021
704
    def test_create_and_find_format(self):
1022
 
        # create a .notbzr dir
 
705
        # create a .notbzr dir 
1023
706
        format = NotBzrDirFormat()
1024
707
        dir = format.initialize(self.get_url())
1025
708
        self.assertIsInstance(dir, NotBzrDir)
1050
733
    def setUp(self):
1051
734
        super(NonLocalTests, self).setUp()
1052
735
        self.vfs_transport_factory = MemoryServer
1053
 
 
 
736
    
1054
737
    def test_create_branch_convenience(self):
1055
738
        # outside a repo the default convenience output is a repo+branch_tree
1056
739
        format = bzrdir.format_registry.make_bzrdir('knit')
1093
776
                              workingtree.WorkingTreeFormat3)
1094
777
 
1095
778
 
1096
 
class TestHTTPRedirections(object):
1097
 
    """Test redirection between two http servers.
 
779
class TestHTTPRedirectionLoop(object):
 
780
    """Test redirection loop between two http servers.
1098
781
 
1099
782
    This MUST be used by daughter classes that also inherit from
1100
783
    TestCaseWithTwoWebservers.
1101
784
 
1102
785
    We can't inherit directly from TestCaseWithTwoWebservers or the
1103
786
    test framework will try to create an instance which cannot
1104
 
    run, its implementation being incomplete.
 
787
    run, its implementation being incomplete. 
1105
788
    """
1106
789
 
 
790
    # Should be defined by daughter classes to ensure redirection
 
791
    # still use the same transport implementation (not currently
 
792
    # enforced as it's a bit tricky to get right (see the FIXME
 
793
    # in BzrDir.open_from_transport for the unique use case so
 
794
    # far)
 
795
    _qualifier = None
 
796
 
1107
797
    def create_transport_readonly_server(self):
1108
 
        return http_utils.HTTPServerRedirecting()
 
798
        return HTTPServerRedirecting()
1109
799
 
1110
800
    def create_transport_secondary_server(self):
1111
 
        return http_utils.HTTPServerRedirecting()
 
801
        return HTTPServerRedirecting()
1112
802
 
1113
803
    def setUp(self):
1114
 
        super(TestHTTPRedirections, self).setUp()
 
804
        # Both servers redirect to each server creating a loop
 
805
        super(TestHTTPRedirectionLoop, self).setUp()
1115
806
        # The redirections will point to the new server
1116
807
        self.new_server = self.get_readonly_server()
1117
808
        # The requests to the old server will be redirected
1118
809
        self.old_server = self.get_secondary_server()
1119
810
        # Configure the redirections
1120
811
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
812
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
813
 
 
814
    def _qualified_url(self, host, port):
 
815
        return 'http+%s://%s:%s' % (self._qualifier, host, port)
1121
816
 
1122
817
    def test_loop(self):
1123
 
        # Both servers redirect to each other creating a loop
1124
 
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
1125
818
        # Starting from either server should loop
1126
 
        old_url = self._qualified_url(self.old_server.host,
 
819
        old_url = self._qualified_url(self.old_server.host, 
1127
820
                                      self.old_server.port)
1128
821
        oldt = self._transport(old_url)
1129
822
        self.assertRaises(errors.NotBranchError,
1130
823
                          bzrdir.BzrDir.open_from_transport, oldt)
1131
 
        new_url = self._qualified_url(self.new_server.host,
 
824
        new_url = self._qualified_url(self.new_server.host, 
1132
825
                                      self.new_server.port)
1133
826
        newt = self._transport(new_url)
1134
827
        self.assertRaises(errors.NotBranchError,
1135
828
                          bzrdir.BzrDir.open_from_transport, newt)
1136
829
 
1137
 
    def test_qualifier_preserved(self):
1138
 
        wt = self.make_branch_and_tree('branch')
1139
 
        old_url = self._qualified_url(self.old_server.host,
1140
 
                                      self.old_server.port)
1141
 
        start = self._transport(old_url).clone('branch')
1142
 
        bdir = bzrdir.BzrDir.open_from_transport(start)
1143
 
        # Redirection should preserve the qualifier, hence the transport class
1144
 
        # itself.
1145
 
        self.assertIsInstance(bdir.root_transport, type(start))
1146
 
 
1147
 
 
1148
 
class TestHTTPRedirections_urllib(TestHTTPRedirections,
1149
 
                                  http_utils.TestCaseWithTwoWebservers):
 
830
 
 
831
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
 
832
                                  TestCaseWithTwoWebservers):
1150
833
    """Tests redirections for urllib implementation"""
1151
834
 
 
835
    _qualifier = 'urllib'
1152
836
    _transport = HttpTransport_urllib
1153
837
 
1154
 
    def _qualified_url(self, host, port):
1155
 
        result = 'http+urllib://%s:%s' % (host, port)
1156
 
        self.permit_url(result)
1157
 
        return result
1158
 
 
1159
838
 
1160
839
 
1161
840
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
1162
 
                                  TestHTTPRedirections,
1163
 
                                  http_utils.TestCaseWithTwoWebservers):
 
841
                                  TestHTTPRedirectionLoop,
 
842
                                  TestCaseWithTwoWebservers):
1164
843
    """Tests redirections for pycurl implementation"""
1165
844
 
1166
 
    def _qualified_url(self, host, port):
1167
 
        result = 'http+pycurl://%s:%s' % (host, port)
1168
 
        self.permit_url(result)
1169
 
        return result
1170
 
 
1171
 
 
1172
 
class TestHTTPRedirections_nosmart(TestHTTPRedirections,
1173
 
                                  http_utils.TestCaseWithTwoWebservers):
1174
 
    """Tests redirections for the nosmart decorator"""
1175
 
 
1176
 
    _transport = NoSmartTransportDecorator
1177
 
 
1178
 
    def _qualified_url(self, host, port):
1179
 
        result = 'nosmart+http://%s:%s' % (host, port)
1180
 
        self.permit_url(result)
1181
 
        return result
1182
 
 
1183
 
 
1184
 
class TestHTTPRedirections_readonly(TestHTTPRedirections,
1185
 
                                    http_utils.TestCaseWithTwoWebservers):
1186
 
    """Tests redirections for readonly decoratror"""
1187
 
 
1188
 
    _transport = ReadonlyTransportDecorator
1189
 
 
1190
 
    def _qualified_url(self, host, port):
1191
 
        result = 'readonly+http://%s:%s' % (host, port)
1192
 
        self.permit_url(result)
1193
 
        return result
1194
 
 
1195
 
 
1196
 
class TestDotBzrHidden(TestCaseWithTransport):
1197
 
 
1198
 
    ls = ['ls']
1199
 
    if sys.platform == 'win32':
1200
 
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
1201
 
 
1202
 
    def get_ls(self):
1203
 
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
1204
 
            stderr=subprocess.PIPE)
1205
 
        out, err = f.communicate()
1206
 
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
1207
 
                         % (self.ls, err))
1208
 
        return out.splitlines()
1209
 
 
1210
 
    def test_dot_bzr_hidden(self):
1211
 
        if sys.platform == 'win32' and not win32utils.has_win32file:
1212
 
            raise TestSkipped('unable to make file hidden without pywin32 library')
1213
 
        b = bzrdir.BzrDir.create('.')
1214
 
        self.build_tree(['a'])
1215
 
        self.assertEquals(['a'], self.get_ls())
1216
 
 
1217
 
    def test_dot_bzr_hidden_with_url(self):
1218
 
        if sys.platform == 'win32' and not win32utils.has_win32file:
1219
 
            raise TestSkipped('unable to make file hidden without pywin32 library')
1220
 
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
1221
 
        self.build_tree(['a'])
1222
 
        self.assertEquals(['a'], self.get_ls())
1223
 
 
1224
 
 
1225
 
class _TestBzrDirFormat(bzrdir.BzrDirMetaFormat1):
1226
 
    """Test BzrDirFormat implementation for TestBzrDirSprout."""
1227
 
 
1228
 
    def _open(self, transport):
1229
 
        return _TestBzrDir(transport, self)
1230
 
 
1231
 
 
1232
 
class _TestBzrDir(bzrdir.BzrDirMeta1):
1233
 
    """Test BzrDir implementation for TestBzrDirSprout.
1234
 
 
1235
 
    When created a _TestBzrDir already has repository and a branch.  The branch
1236
 
    is a test double as well.
1237
 
    """
1238
 
 
1239
 
    def __init__(self, *args, **kwargs):
1240
 
        super(_TestBzrDir, self).__init__(*args, **kwargs)
1241
 
        self.test_branch = _TestBranch()
1242
 
        self.test_branch.repository = self.create_repository()
1243
 
 
1244
 
    def open_branch(self, unsupported=False):
1245
 
        return self.test_branch
1246
 
 
1247
 
    def cloning_metadir(self, require_stacking=False):
1248
 
        return _TestBzrDirFormat()
1249
 
 
1250
 
 
1251
 
class _TestBranchFormat(bzrlib.branch.BranchFormat):
1252
 
    """Test Branch format for TestBzrDirSprout."""
1253
 
 
1254
 
 
1255
 
class _TestBranch(bzrlib.branch.Branch):
1256
 
    """Test Branch implementation for TestBzrDirSprout."""
1257
 
 
1258
 
    def __init__(self, *args, **kwargs):
1259
 
        self._format = _TestBranchFormat()
1260
 
        super(_TestBranch, self).__init__(*args, **kwargs)
1261
 
        self.calls = []
1262
 
        self._parent = None
1263
 
 
1264
 
    def sprout(self, *args, **kwargs):
1265
 
        self.calls.append('sprout')
1266
 
        return _TestBranch()
1267
 
 
1268
 
    def copy_content_into(self, destination, revision_id=None):
1269
 
        self.calls.append('copy_content_into')
1270
 
 
1271
 
    def get_parent(self):
1272
 
        return self._parent
1273
 
 
1274
 
    def set_parent(self, parent):
1275
 
        self._parent = parent
1276
 
 
1277
 
 
1278
 
class TestBzrDirSprout(TestCaseWithMemoryTransport):
1279
 
 
1280
 
    def test_sprout_uses_branch_sprout(self):
1281
 
        """BzrDir.sprout calls Branch.sprout.
1282
 
 
1283
 
        Usually, BzrDir.sprout should delegate to the branch's sprout method
1284
 
        for part of the work.  This allows the source branch to control the
1285
 
        choice of format for the new branch.
1286
 
 
1287
 
        There are exceptions, but this tests avoids them:
1288
 
          - if there's no branch in the source bzrdir,
1289
 
          - or if the stacking has been requested and the format needs to be
1290
 
            overridden to satisfy that.
1291
 
        """
1292
 
        # Make an instrumented bzrdir.
1293
 
        t = self.get_transport('source')
1294
 
        t.ensure_base()
1295
 
        source_bzrdir = _TestBzrDirFormat().initialize_on_transport(t)
1296
 
        # The instrumented bzrdir has a test_branch attribute that logs calls
1297
 
        # made to the branch contained in that bzrdir.  Initially the test
1298
 
        # branch exists but no calls have been made to it.
1299
 
        self.assertEqual([], source_bzrdir.test_branch.calls)
1300
 
 
1301
 
        # Sprout the bzrdir
1302
 
        target_url = self.get_url('target')
1303
 
        result = source_bzrdir.sprout(target_url, recurse='no')
1304
 
 
1305
 
        # The bzrdir called the branch's sprout method.
1306
 
        self.assertSubset(['sprout'], source_bzrdir.test_branch.calls)
1307
 
 
1308
 
    def test_sprout_parent(self):
1309
 
        grandparent_tree = self.make_branch('grandparent')
1310
 
        parent = grandparent_tree.bzrdir.sprout('parent').open_branch()
1311
 
        branch_tree = parent.bzrdir.sprout('branch').open_branch()
1312
 
        self.assertContainsRe(branch_tree.get_parent(), '/parent/$')
1313
 
 
1314
 
 
1315
 
class TestBzrDirHooks(TestCaseWithMemoryTransport):
1316
 
 
1317
 
    def test_pre_open_called(self):
1318
 
        calls = []
1319
 
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', calls.append, None)
1320
 
        transport = self.get_transport('foo')
1321
 
        url = transport.base
1322
 
        self.assertRaises(errors.NotBranchError, bzrdir.BzrDir.open, url)
1323
 
        self.assertEqual([transport.base], [t.base for t in calls])
1324
 
 
1325
 
    def test_pre_open_actual_exceptions_raised(self):
1326
 
        count = [0]
1327
 
        def fail_once(transport):
1328
 
            count[0] += 1
1329
 
            if count[0] == 1:
1330
 
                raise errors.BzrError("fail")
1331
 
        bzrdir.BzrDir.hooks.install_named_hook('pre_open', fail_once, None)
1332
 
        transport = self.get_transport('foo')
1333
 
        url = transport.base
1334
 
        err = self.assertRaises(errors.BzrError, bzrdir.BzrDir.open, url)
1335
 
        self.assertEqual('fail', err._preformatted_string)
 
845
    _qualifier = 'pycurl'