~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-11-03 23:02:16 UTC
  • mfrom: (2951.1.1 pack)
  • Revision ID: pqm@pqm.ubuntu.com-20071103230216-mnmwuxm413lyhjdv
(robertc) Fix data-refresh logic for packs not to refresh mid-transaction when a names write lock is held. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/bzr_dir_implementations.
 
20
"""
 
21
 
 
22
import os.path
 
23
from StringIO import StringIO
 
24
 
 
25
from bzrlib import (
 
26
    bzrdir,
 
27
    errors,
 
28
    help_topics,
 
29
    repository,
 
30
    symbol_versioning,
 
31
    urlutils,
 
32
    workingtree,
 
33
    )
 
34
import bzrlib.branch
 
35
from bzrlib.errors import (NotBranchError,
 
36
                           UnknownFormatError,
 
37
                           UnsupportedFormatError,
 
38
                           )
 
39
from bzrlib.symbol_versioning import (
 
40
    zero_ninetyone,
 
41
    )
 
42
from bzrlib.tests import (
 
43
    TestCase,
 
44
    TestCaseWithTransport,
 
45
    test_sftp_transport
 
46
    )
 
47
from bzrlib.tests.HttpServer import HttpServer
 
48
from bzrlib.tests.HTTPTestUtil import (
 
49
    TestCaseWithTwoWebservers,
 
50
    HTTPServerRedirecting,
 
51
    )
 
52
from bzrlib.tests.test_http import TestWithTransport_pycurl
 
53
from bzrlib.transport import get_transport
 
54
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
55
from bzrlib.transport.memory import MemoryServer
 
56
from bzrlib.repofmt import knitrepo, weaverepo
 
57
 
 
58
 
 
59
class TestDefaultFormat(TestCase):
 
60
 
 
61
    def test_get_set_default_format(self):
 
62
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
63
        # default is BzrDirFormat6
 
64
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
 
65
        self.applyDeprecated(symbol_versioning.zero_fourteen, 
 
66
                             bzrdir.BzrDirFormat.set_default_format, 
 
67
                             SampleBzrDirFormat())
 
68
        # creating a bzr dir should now create an instrumented dir.
 
69
        try:
 
70
            result = bzrdir.BzrDir.create('memory:///')
 
71
            self.failUnless(isinstance(result, SampleBzrDir))
 
72
        finally:
 
73
            self.applyDeprecated(symbol_versioning.zero_fourteen,
 
74
                bzrdir.BzrDirFormat.set_default_format, old_format)
 
75
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
76
 
 
77
 
 
78
class TestFormatRegistry(TestCase):
 
79
 
 
80
    def make_format_registry(self):
 
81
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
82
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
 
83
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
84
            ' repositories', deprecated=True)
 
85
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir', 
 
86
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
 
87
        my_format_registry.register_metadir('knit',
 
88
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
 
89
            'Format using knits',
 
90
            )
 
91
        my_format_registry.set_default('knit')
 
92
        my_format_registry.register_metadir(
 
93
            'branch6',
 
94
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
95
            'Experimental successor to knit.  Use at your own risk.',
 
96
            branch_format='bzrlib.branch.BzrBranchFormat6',
 
97
            experimental=True)
 
98
        my_format_registry.register_metadir(
 
99
            'hidden format',
 
100
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
101
            'Experimental successor to knit.  Use at your own risk.',
 
102
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
 
103
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
 
104
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
105
            ' repositories', hidden=True)
 
106
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
 
107
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
 
108
            hidden=True)
 
109
        return my_format_registry
 
110
 
 
111
    def test_format_registry(self):
 
112
        my_format_registry = self.make_format_registry()
 
113
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
 
114
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
115
        my_bzrdir = my_format_registry.make_bzrdir('weave')
 
116
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
117
        my_bzrdir = my_format_registry.make_bzrdir('default')
 
118
        self.assertIsInstance(my_bzrdir.repository_format, 
 
119
            knitrepo.RepositoryFormatKnit1)
 
120
        my_bzrdir = my_format_registry.make_bzrdir('knit')
 
121
        self.assertIsInstance(my_bzrdir.repository_format, 
 
122
            knitrepo.RepositoryFormatKnit1)
 
123
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
 
124
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
125
                              bzrlib.branch.BzrBranchFormat6)
 
126
 
 
127
    def test_get_help(self):
 
128
        my_format_registry = self.make_format_registry()
 
129
        self.assertEqual('Format registered lazily',
 
130
                         my_format_registry.get_help('lazy'))
 
131
        self.assertEqual('Format using knits', 
 
132
                         my_format_registry.get_help('knit'))
 
133
        self.assertEqual('Format using knits', 
 
134
                         my_format_registry.get_help('default'))
 
135
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
 
136
                         ' checkouts or shared repositories', 
 
137
                         my_format_registry.get_help('weave'))
 
138
        
 
139
    def test_help_topic(self):
 
140
        topics = help_topics.HelpTopicRegistry()
 
141
        topics.register('formats', self.make_format_registry().help_topic, 
 
142
                        'Directory formats')
 
143
        topic = topics.get_detail('formats')
 
144
        new, rest = topic.split('Experimental formats')
 
145
        experimental, deprecated = rest.split('Deprecated formats')
 
146
        self.assertContainsRe(new, 'These formats can be used')
 
147
        self.assertContainsRe(new, 
 
148
                ':knit:\n    \(native\) \(default\) Format using knits\n')
 
149
        self.assertContainsRe(experimental, 
 
150
                ':branch6:\n    \(native\) Experimental successor to knit')
 
151
        self.assertContainsRe(deprecated, 
 
152
                ':lazy:\n    \(native\) Format registered lazily\n')
 
153
        self.assertNotContainsRe(new, 'hidden')
 
154
 
 
155
    def test_set_default_repository(self):
 
156
        default_factory = bzrdir.format_registry.get('default')
 
157
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
 
158
                       if v == default_factory and k != 'default'][0]
 
159
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
 
160
        try:
 
161
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
 
162
                          bzrdir.format_registry.get('default'))
 
163
            self.assertIs(
 
164
                repository.RepositoryFormat.get_default_format().__class__,
 
165
                knitrepo.RepositoryFormatKnit3)
 
166
        finally:
 
167
            bzrdir.format_registry.set_default_repository(old_default)
 
168
 
 
169
 
 
170
class SampleBranch(bzrlib.branch.Branch):
 
171
    """A dummy branch for guess what, dummy use."""
 
172
 
 
173
    def __init__(self, dir):
 
174
        self.bzrdir = dir
 
175
 
 
176
 
 
177
class SampleBzrDir(bzrdir.BzrDir):
 
178
    """A sample BzrDir implementation to allow testing static methods."""
 
179
 
 
180
    def create_repository(self, shared=False):
 
181
        """See BzrDir.create_repository."""
 
182
        return "A repository"
 
183
 
 
184
    def open_repository(self):
 
185
        """See BzrDir.open_repository."""
 
186
        return "A repository"
 
187
 
 
188
    def create_branch(self):
 
189
        """See BzrDir.create_branch."""
 
190
        return SampleBranch(self)
 
191
 
 
192
    def create_workingtree(self):
 
193
        """See BzrDir.create_workingtree."""
 
194
        return "A tree"
 
195
 
 
196
 
 
197
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
198
    """A sample format
 
199
 
 
200
    this format is initializable, unsupported to aid in testing the 
 
201
    open and open_downlevel routines.
 
202
    """
 
203
 
 
204
    def get_format_string(self):
 
205
        """See BzrDirFormat.get_format_string()."""
 
206
        return "Sample .bzr dir format."
 
207
 
 
208
    def initialize_on_transport(self, t):
 
209
        """Create a bzr dir."""
 
210
        t.mkdir('.bzr')
 
211
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
212
        return SampleBzrDir(t, self)
 
213
 
 
214
    def is_supported(self):
 
215
        return False
 
216
 
 
217
    def open(self, transport, _found=None):
 
218
        return "opened branch."
 
219
 
 
220
 
 
221
class TestBzrDirFormat(TestCaseWithTransport):
 
222
    """Tests for the BzrDirFormat facility."""
 
223
 
 
224
    def test_find_format(self):
 
225
        # is the right format object found for a branch?
 
226
        # create a branch with a few known format objects.
 
227
        # this is not quite the same as 
 
228
        t = get_transport(self.get_url())
 
229
        self.build_tree(["foo/", "bar/"], transport=t)
 
230
        def check_format(format, url):
 
231
            format.initialize(url)
 
232
            t = get_transport(url)
 
233
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
234
            self.failUnless(isinstance(found_format, format.__class__))
 
235
        check_format(bzrdir.BzrDirFormat5(), "foo")
 
236
        check_format(bzrdir.BzrDirFormat6(), "bar")
 
237
        
 
238
    def test_find_format_nothing_there(self):
 
239
        self.assertRaises(NotBranchError,
 
240
                          bzrdir.BzrDirFormat.find_format,
 
241
                          get_transport('.'))
 
242
 
 
243
    def test_find_format_unknown_format(self):
 
244
        t = get_transport(self.get_url())
 
245
        t.mkdir('.bzr')
 
246
        t.put_bytes('.bzr/branch-format', '')
 
247
        self.assertRaises(UnknownFormatError,
 
248
                          bzrdir.BzrDirFormat.find_format,
 
249
                          get_transport('.'))
 
250
 
 
251
    def test_register_unregister_format(self):
 
252
        format = SampleBzrDirFormat()
 
253
        url = self.get_url()
 
254
        # make a bzrdir
 
255
        format.initialize(url)
 
256
        # register a format for it.
 
257
        bzrdir.BzrDirFormat.register_format(format)
 
258
        # which bzrdir.Open will refuse (not supported)
 
259
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
260
        # which bzrdir.open_containing will refuse (not supported)
 
261
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
 
262
        # but open_downlevel will work
 
263
        t = get_transport(url)
 
264
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
265
        # unregister the format
 
266
        bzrdir.BzrDirFormat.unregister_format(format)
 
267
        # now open_downlevel should fail too.
 
268
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
 
269
 
 
270
    def test_create_repository_deprecated(self):
 
271
        # new interface is to make the bzrdir, then a repository within that.
 
272
        format = SampleBzrDirFormat()
 
273
        repo = self.applyDeprecated(zero_ninetyone,
 
274
                bzrdir.BzrDir.create_repository,
 
275
                self.get_url(), format=format)
 
276
        self.assertEqual('A repository', repo)
 
277
 
 
278
    def test_create_repository_shared(self):
 
279
        # new interface is to make the bzrdir, then a repository within that.
 
280
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
281
        repo = self.applyDeprecated(zero_ninetyone,
 
282
                bzrdir.BzrDir.create_repository,
 
283
                '.', shared=True)
 
284
        self.assertTrue(repo.is_shared())
 
285
 
 
286
    def test_create_repository_nonshared(self):
 
287
        # new interface is to make the bzrdir, then a repository within that.
 
288
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
289
        repo = self.applyDeprecated(zero_ninetyone,
 
290
                bzrdir.BzrDir.create_repository,
 
291
                '.')
 
292
        self.assertFalse(repo.is_shared())
 
293
 
 
294
    def test_create_repository_under_shared(self):
 
295
        # an explicit create_repository always does so.
 
296
        # we trust the format is right from the 'create_repository test'
 
297
        # new interface is to make the bzrdir, then a repository within that.
 
298
        format = bzrdir.format_registry.make_bzrdir('knit')
 
299
        self.make_repository('.', shared=True, format=format)
 
300
        repo = self.applyDeprecated(zero_ninetyone,
 
301
                bzrdir.BzrDir.create_repository,
 
302
                self.get_url('child'),
 
303
                format=format)
 
304
        self.assertTrue(isinstance(repo, repository.Repository))
 
305
        self.assertTrue(repo.bzrdir.root_transport.base.endswith('child/'))
 
306
 
 
307
    def test_create_branch_and_repo_uses_default(self):
 
308
        format = SampleBzrDirFormat()
 
309
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
310
                                                      format=format)
 
311
        self.assertTrue(isinstance(branch, SampleBranch))
 
312
 
 
313
    def test_create_branch_and_repo_under_shared(self):
 
314
        # creating a branch and repo in a shared repo uses the
 
315
        # shared repository
 
316
        format = bzrdir.format_registry.make_bzrdir('knit')
 
317
        self.make_repository('.', shared=True, format=format)
 
318
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
319
            self.get_url('child'), format=format)
 
320
        self.assertRaises(errors.NoRepositoryPresent,
 
321
                          branch.bzrdir.open_repository)
 
322
 
 
323
    def test_create_branch_and_repo_under_shared_force_new(self):
 
324
        # creating a branch and repo in a shared repo can be forced to 
 
325
        # make a new repo
 
326
        format = bzrdir.format_registry.make_bzrdir('knit')
 
327
        self.make_repository('.', shared=True, format=format)
 
328
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
329
                                                      force_new_repo=True,
 
330
                                                      format=format)
 
331
        branch.bzrdir.open_repository()
 
332
 
 
333
    def test_create_standalone_working_tree(self):
 
334
        format = SampleBzrDirFormat()
 
335
        # note this is deliberately readonly, as this failure should 
 
336
        # occur before any writes.
 
337
        self.assertRaises(errors.NotLocalUrl,
 
338
                          bzrdir.BzrDir.create_standalone_workingtree,
 
339
                          self.get_readonly_url(), format=format)
 
340
        tree = bzrdir.BzrDir.create_standalone_workingtree('.', 
 
341
                                                           format=format)
 
342
        self.assertEqual('A tree', tree)
 
343
 
 
344
    def test_create_standalone_working_tree_under_shared_repo(self):
 
345
        # create standalone working tree always makes a repo.
 
346
        format = bzrdir.format_registry.make_bzrdir('knit')
 
347
        self.make_repository('.', shared=True, format=format)
 
348
        # note this is deliberately readonly, as this failure should 
 
349
        # occur before any writes.
 
350
        self.assertRaises(errors.NotLocalUrl,
 
351
                          bzrdir.BzrDir.create_standalone_workingtree,
 
352
                          self.get_readonly_url('child'), format=format)
 
353
        tree = bzrdir.BzrDir.create_standalone_workingtree('child', 
 
354
            format=format)
 
355
        tree.bzrdir.open_repository()
 
356
 
 
357
    def test_create_branch_convenience(self):
 
358
        # outside a repo the default convenience output is a repo+branch_tree
 
359
        format = bzrdir.format_registry.make_bzrdir('knit')
 
360
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
361
        branch.bzrdir.open_workingtree()
 
362
        branch.bzrdir.open_repository()
 
363
 
 
364
    def test_create_branch_convenience_possible_transports(self):
 
365
        """Check that the optional 'possible_transports' is recognized"""
 
366
        format = bzrdir.format_registry.make_bzrdir('knit')
 
367
        t = self.get_transport()
 
368
        branch = bzrdir.BzrDir.create_branch_convenience(
 
369
            '.', format=format, possible_transports=[t])
 
370
        branch.bzrdir.open_workingtree()
 
371
        branch.bzrdir.open_repository()
 
372
 
 
373
    def test_create_branch_convenience_root(self):
 
374
        """Creating a branch at the root of a fs should work."""
 
375
        self.vfs_transport_factory = MemoryServer
 
376
        # outside a repo the default convenience output is a repo+branch_tree
 
377
        format = bzrdir.format_registry.make_bzrdir('knit')
 
378
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(), 
 
379
                                                         format=format)
 
380
        self.assertRaises(errors.NoWorkingTree,
 
381
                          branch.bzrdir.open_workingtree)
 
382
        branch.bzrdir.open_repository()
 
383
 
 
384
    def test_create_branch_convenience_under_shared_repo(self):
 
385
        # inside a repo the default convenience output is a branch+ follow the
 
386
        # repo tree policy
 
387
        format = bzrdir.format_registry.make_bzrdir('knit')
 
388
        self.make_repository('.', shared=True, format=format)
 
389
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
390
            format=format)
 
391
        branch.bzrdir.open_workingtree()
 
392
        self.assertRaises(errors.NoRepositoryPresent,
 
393
                          branch.bzrdir.open_repository)
 
394
            
 
395
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
396
        # inside a repo the default convenience output is a branch+ follow the
 
397
        # repo tree policy but we can override that
 
398
        format = bzrdir.format_registry.make_bzrdir('knit')
 
399
        self.make_repository('.', shared=True, format=format)
 
400
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
401
            force_new_tree=False, format=format)
 
402
        self.assertRaises(errors.NoWorkingTree,
 
403
                          branch.bzrdir.open_workingtree)
 
404
        self.assertRaises(errors.NoRepositoryPresent,
 
405
                          branch.bzrdir.open_repository)
 
406
            
 
407
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
408
        # inside a repo the default convenience output is a branch+ follow the
 
409
        # repo tree policy
 
410
        format = bzrdir.format_registry.make_bzrdir('knit')
 
411
        repo = self.make_repository('.', shared=True, format=format)
 
412
        repo.set_make_working_trees(False)
 
413
        branch = bzrdir.BzrDir.create_branch_convenience('child', 
 
414
                                                         format=format)
 
415
        self.assertRaises(errors.NoWorkingTree,
 
416
                          branch.bzrdir.open_workingtree)
 
417
        self.assertRaises(errors.NoRepositoryPresent,
 
418
                          branch.bzrdir.open_repository)
 
419
 
 
420
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
421
        # inside a repo the default convenience output is a branch+ follow the
 
422
        # repo tree policy but we can override that
 
423
        format = bzrdir.format_registry.make_bzrdir('knit')
 
424
        repo = self.make_repository('.', shared=True, format=format)
 
425
        repo.set_make_working_trees(False)
 
426
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
427
            force_new_tree=True, format=format)
 
428
        branch.bzrdir.open_workingtree()
 
429
        self.assertRaises(errors.NoRepositoryPresent,
 
430
                          branch.bzrdir.open_repository)
 
431
 
 
432
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
433
        # inside a repo the default convenience output is overridable to give
 
434
        # repo+branch+tree
 
435
        format = bzrdir.format_registry.make_bzrdir('knit')
 
436
        self.make_repository('.', shared=True, format=format)
 
437
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
438
            force_new_repo=True, format=format)
 
439
        branch.bzrdir.open_repository()
 
440
        branch.bzrdir.open_workingtree()
 
441
 
 
442
 
 
443
class ChrootedTests(TestCaseWithTransport):
 
444
    """A support class that provides readonly urls outside the local namespace.
 
445
 
 
446
    This is done by checking if self.transport_server is a MemoryServer. if it
 
447
    is then we are chrooted already, if it is not then an HttpServer is used
 
448
    for readonly urls.
 
449
    """
 
450
 
 
451
    def setUp(self):
 
452
        super(ChrootedTests, self).setUp()
 
453
        if not self.vfs_transport_factory == MemoryServer:
 
454
            self.transport_readonly_server = HttpServer
 
455
 
 
456
    def test_open_containing(self):
 
457
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
458
                          self.get_readonly_url(''))
 
459
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
460
                          self.get_readonly_url('g/p/q'))
 
461
        control = bzrdir.BzrDir.create(self.get_url())
 
462
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
 
463
        self.assertEqual('', relpath)
 
464
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
 
465
        self.assertEqual('g/p/q', relpath)
 
466
 
 
467
    def test_open_containing_from_transport(self):
 
468
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
469
                          get_transport(self.get_readonly_url('')))
 
470
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
471
                          get_transport(self.get_readonly_url('g/p/q')))
 
472
        control = bzrdir.BzrDir.create(self.get_url())
 
473
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
474
            get_transport(self.get_readonly_url('')))
 
475
        self.assertEqual('', relpath)
 
476
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
477
            get_transport(self.get_readonly_url('g/p/q')))
 
478
        self.assertEqual('g/p/q', relpath)
 
479
 
 
480
    def test_open_containing_tree_or_branch(self):
 
481
        def local_branch_path(branch):
 
482
             return os.path.realpath(
 
483
                urlutils.local_path_from_url(branch.base))
 
484
 
 
485
        self.make_branch_and_tree('topdir')
 
486
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
487
            'topdir/foo')
 
488
        self.assertEqual(os.path.realpath('topdir'),
 
489
                         os.path.realpath(tree.basedir))
 
490
        self.assertEqual(os.path.realpath('topdir'),
 
491
                         local_branch_path(branch))
 
492
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
493
        self.assertEqual('foo', relpath)
 
494
        # opening from non-local should not return the tree
 
495
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
496
            self.get_readonly_url('topdir/foo'))
 
497
        self.assertEqual(None, tree)
 
498
        self.assertEqual('foo', relpath)
 
499
        # without a tree:
 
500
        self.make_branch('topdir/foo')
 
501
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
502
            'topdir/foo')
 
503
        self.assertIs(tree, None)
 
504
        self.assertEqual(os.path.realpath('topdir/foo'),
 
505
                         local_branch_path(branch))
 
506
        self.assertEqual('', relpath)
 
507
 
 
508
    def test_open_from_transport(self):
 
509
        # transport pointing at bzrdir should give a bzrdir with root transport
 
510
        # set to the given transport
 
511
        control = bzrdir.BzrDir.create(self.get_url())
 
512
        transport = get_transport(self.get_url())
 
513
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
 
514
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
 
515
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
516
        
 
517
    def test_open_from_transport_no_bzrdir(self):
 
518
        transport = get_transport(self.get_url())
 
519
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
520
                          transport)
 
521
 
 
522
    def test_open_from_transport_bzrdir_in_parent(self):
 
523
        control = bzrdir.BzrDir.create(self.get_url())
 
524
        transport = get_transport(self.get_url())
 
525
        transport.mkdir('subdir')
 
526
        transport = transport.clone('subdir')
 
527
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
528
                          transport)
 
529
 
 
530
    def test_sprout_recursive(self):
 
531
        tree = self.make_branch_and_tree('tree1', format='dirstate-with-subtree')
 
532
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
533
            format='dirstate-with-subtree')
 
534
        tree.add_reference(sub_tree)
 
535
        self.build_tree(['tree1/subtree/file'])
 
536
        sub_tree.add('file')
 
537
        tree.commit('Initial commit')
 
538
        tree.bzrdir.sprout('tree2')
 
539
        self.failUnlessExists('tree2/subtree/file')
 
540
 
 
541
    def test_cloning_metadir(self):
 
542
        """Ensure that cloning metadir is suitable"""
 
543
        bzrdir = self.make_bzrdir('bzrdir')
 
544
        bzrdir.cloning_metadir()
 
545
        branch = self.make_branch('branch', format='knit')
 
546
        format = branch.bzrdir.cloning_metadir()
 
547
        self.assertIsInstance(format.workingtree_format,
 
548
            workingtree.WorkingTreeFormat3)
 
549
 
 
550
    def test_sprout_recursive_treeless(self):
 
551
        tree = self.make_branch_and_tree('tree1',
 
552
            format='dirstate-with-subtree')
 
553
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
554
            format='dirstate-with-subtree')
 
555
        tree.add_reference(sub_tree)
 
556
        self.build_tree(['tree1/subtree/file'])
 
557
        sub_tree.add('file')
 
558
        tree.commit('Initial commit')
 
559
        tree.bzrdir.destroy_workingtree()
 
560
        repo = self.make_repository('repo', shared=True,
 
561
            format='dirstate-with-subtree')
 
562
        repo.set_make_working_trees(False)
 
563
        tree.bzrdir.sprout('repo/tree2')
 
564
        self.failUnlessExists('repo/tree2/subtree')
 
565
        self.failIfExists('repo/tree2/subtree/file')
 
566
 
 
567
 
 
568
class TestMeta1DirFormat(TestCaseWithTransport):
 
569
    """Tests specific to the meta1 dir format."""
 
570
 
 
571
    def test_right_base_dirs(self):
 
572
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
573
        t = dir.transport
 
574
        branch_base = t.clone('branch').base
 
575
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
576
        self.assertEqual(branch_base,
 
577
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
 
578
        repository_base = t.clone('repository').base
 
579
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
580
        self.assertEqual(repository_base,
 
581
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
582
        checkout_base = t.clone('checkout').base
 
583
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
 
584
        self.assertEqual(checkout_base,
 
585
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
 
586
 
 
587
    def test_meta1dir_uses_lockdir(self):
 
588
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
589
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
590
        t = dir.transport
 
591
        self.assertIsDirectory('branch-lock', t)
 
592
 
 
593
    def test_comparison(self):
 
594
        """Equality and inequality behave properly.
 
595
 
 
596
        Metadirs should compare equal iff they have the same repo, branch and
 
597
        tree formats.
 
598
        """
 
599
        mydir = bzrdir.format_registry.make_bzrdir('knit')
 
600
        self.assertEqual(mydir, mydir)
 
601
        self.assertFalse(mydir != mydir)
 
602
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
 
603
        self.assertEqual(otherdir, mydir)
 
604
        self.assertFalse(otherdir != mydir)
 
605
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
606
        self.assertNotEqual(otherdir2, mydir)
 
607
        self.assertFalse(otherdir2 == mydir)
 
608
 
 
609
    def test_needs_conversion_different_working_tree(self):
 
610
        # meta1dirs need an conversion if any element is not the default.
 
611
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
612
        # test with 
 
613
        new_default = bzrdir.format_registry.make_bzrdir('dirstate')
 
614
        bzrdir.BzrDirFormat._set_default_format(new_default)
 
615
        try:
 
616
            tree = self.make_branch_and_tree('tree', format='knit')
 
617
            self.assertTrue(tree.bzrdir.needs_format_conversion())
 
618
        finally:
 
619
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
620
 
 
621
 
 
622
class TestFormat5(TestCaseWithTransport):
 
623
    """Tests specific to the version 5 bzrdir format."""
 
624
 
 
625
    def test_same_lockfiles_between_tree_repo_branch(self):
 
626
        # this checks that only a single lockfiles instance is created 
 
627
        # for format 5 objects
 
628
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
629
        def check_dir_components_use_same_lock(dir):
 
630
            ctrl_1 = dir.open_repository().control_files
 
631
            ctrl_2 = dir.open_branch().control_files
 
632
            ctrl_3 = dir.open_workingtree()._control_files
 
633
            self.assertTrue(ctrl_1 is ctrl_2)
 
634
            self.assertTrue(ctrl_2 is ctrl_3)
 
635
        check_dir_components_use_same_lock(dir)
 
636
        # and if we open it normally.
 
637
        dir = bzrdir.BzrDir.open(self.get_url())
 
638
        check_dir_components_use_same_lock(dir)
 
639
    
 
640
    def test_can_convert(self):
 
641
        # format 5 dirs are convertable
 
642
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
643
        self.assertTrue(dir.can_convert_format())
 
644
    
 
645
    def test_needs_conversion(self):
 
646
        # format 5 dirs need a conversion if they are not the default.
 
647
        # and they start of not the default.
 
648
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
649
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
 
650
        try:
 
651
            dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
652
            self.assertFalse(dir.needs_format_conversion())
 
653
        finally:
 
654
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
655
        self.assertTrue(dir.needs_format_conversion())
 
656
 
 
657
 
 
658
class TestFormat6(TestCaseWithTransport):
 
659
    """Tests specific to the version 6 bzrdir format."""
 
660
 
 
661
    def test_same_lockfiles_between_tree_repo_branch(self):
 
662
        # this checks that only a single lockfiles instance is created 
 
663
        # for format 6 objects
 
664
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
665
        def check_dir_components_use_same_lock(dir):
 
666
            ctrl_1 = dir.open_repository().control_files
 
667
            ctrl_2 = dir.open_branch().control_files
 
668
            ctrl_3 = dir.open_workingtree()._control_files
 
669
            self.assertTrue(ctrl_1 is ctrl_2)
 
670
            self.assertTrue(ctrl_2 is ctrl_3)
 
671
        check_dir_components_use_same_lock(dir)
 
672
        # and if we open it normally.
 
673
        dir = bzrdir.BzrDir.open(self.get_url())
 
674
        check_dir_components_use_same_lock(dir)
 
675
    
 
676
    def test_can_convert(self):
 
677
        # format 6 dirs are convertable
 
678
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
679
        self.assertTrue(dir.can_convert_format())
 
680
    
 
681
    def test_needs_conversion(self):
 
682
        # format 6 dirs need an conversion if they are not the default.
 
683
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
684
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirMetaFormat1())
 
685
        try:
 
686
            dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
687
            self.assertTrue(dir.needs_format_conversion())
 
688
        finally:
 
689
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
690
 
 
691
 
 
692
class NotBzrDir(bzrlib.bzrdir.BzrDir):
 
693
    """A non .bzr based control directory."""
 
694
 
 
695
    def __init__(self, transport, format):
 
696
        self._format = format
 
697
        self.root_transport = transport
 
698
        self.transport = transport.clone('.not')
 
699
 
 
700
 
 
701
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
 
702
    """A test class representing any non-.bzr based disk format."""
 
703
 
 
704
    def initialize_on_transport(self, transport):
 
705
        """Initialize a new .not dir in the base directory of a Transport."""
 
706
        transport.mkdir('.not')
 
707
        return self.open(transport)
 
708
 
 
709
    def open(self, transport):
 
710
        """Open this directory."""
 
711
        return NotBzrDir(transport, self)
 
712
 
 
713
    @classmethod
 
714
    def _known_formats(self):
 
715
        return set([NotBzrDirFormat()])
 
716
 
 
717
    @classmethod
 
718
    def probe_transport(self, transport):
 
719
        """Our format is present if the transport ends in '.not/'."""
 
720
        if transport.has('.not'):
 
721
            return NotBzrDirFormat()
 
722
 
 
723
 
 
724
class TestNotBzrDir(TestCaseWithTransport):
 
725
    """Tests for using the bzrdir api with a non .bzr based disk format.
 
726
    
 
727
    If/when one of these is in the core, we can let the implementation tests
 
728
    verify this works.
 
729
    """
 
730
 
 
731
    def test_create_and_find_format(self):
 
732
        # create a .notbzr dir 
 
733
        format = NotBzrDirFormat()
 
734
        dir = format.initialize(self.get_url())
 
735
        self.assertIsInstance(dir, NotBzrDir)
 
736
        # now probe for it.
 
737
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
738
        try:
 
739
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
 
740
                get_transport(self.get_url()))
 
741
            self.assertIsInstance(found, NotBzrDirFormat)
 
742
        finally:
 
743
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
744
 
 
745
    def test_included_in_known_formats(self):
 
746
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
747
        try:
 
748
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
 
749
            for format in formats:
 
750
                if isinstance(format, NotBzrDirFormat):
 
751
                    return
 
752
            self.fail("No NotBzrDirFormat in %s" % formats)
 
753
        finally:
 
754
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
755
 
 
756
 
 
757
class NonLocalTests(TestCaseWithTransport):
 
758
    """Tests for bzrdir static behaviour on non local paths."""
 
759
 
 
760
    def setUp(self):
 
761
        super(NonLocalTests, self).setUp()
 
762
        self.vfs_transport_factory = MemoryServer
 
763
    
 
764
    def test_create_branch_convenience(self):
 
765
        # outside a repo the default convenience output is a repo+branch_tree
 
766
        format = bzrdir.format_registry.make_bzrdir('knit')
 
767
        branch = bzrdir.BzrDir.create_branch_convenience(
 
768
            self.get_url('foo'), format=format)
 
769
        self.assertRaises(errors.NoWorkingTree,
 
770
                          branch.bzrdir.open_workingtree)
 
771
        branch.bzrdir.open_repository()
 
772
 
 
773
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
774
        # outside a repo the default convenience output is a repo+branch_tree
 
775
        format = bzrdir.format_registry.make_bzrdir('knit')
 
776
        self.assertRaises(errors.NotLocalUrl,
 
777
            bzrdir.BzrDir.create_branch_convenience,
 
778
            self.get_url('foo'),
 
779
            force_new_tree=True,
 
780
            format=format)
 
781
        t = get_transport(self.get_url('.'))
 
782
        self.assertFalse(t.has('foo'))
 
783
 
 
784
    def test_clone(self):
 
785
        # clone into a nonlocal path works
 
786
        format = bzrdir.format_registry.make_bzrdir('knit')
 
787
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
788
                                                         format=format)
 
789
        branch.bzrdir.open_workingtree()
 
790
        result = branch.bzrdir.clone(self.get_url('remote'))
 
791
        self.assertRaises(errors.NoWorkingTree,
 
792
                          result.open_workingtree)
 
793
        result.open_branch()
 
794
        result.open_repository()
 
795
 
 
796
    def test_checkout_metadir(self):
 
797
        # checkout_metadir has reasonable working tree format even when no
 
798
        # working tree is present
 
799
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
800
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
801
        checkout_format = my_bzrdir.checkout_metadir()
 
802
        self.assertIsInstance(checkout_format.workingtree_format,
 
803
                              workingtree.WorkingTreeFormat3)
 
804
 
 
805
 
 
806
class TestHTTPRedirectionLoop(object):
 
807
    """Test redirection loop between two http servers.
 
808
 
 
809
    This MUST be used by daughter classes that also inherit from
 
810
    TestCaseWithTwoWebservers.
 
811
 
 
812
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
813
    test framework will try to create an instance which cannot
 
814
    run, its implementation being incomplete. 
 
815
    """
 
816
 
 
817
    # Should be defined by daughter classes to ensure redirection
 
818
    # still use the same transport implementation (not currently
 
819
    # enforced as it's a bit tricky to get right (see the FIXME
 
820
    # in BzrDir.open_from_transport for the unique use case so
 
821
    # far)
 
822
    _qualifier = None
 
823
 
 
824
    def create_transport_readonly_server(self):
 
825
        return HTTPServerRedirecting()
 
826
 
 
827
    def create_transport_secondary_server(self):
 
828
        return HTTPServerRedirecting()
 
829
 
 
830
    def setUp(self):
 
831
        # Both servers redirect to each server creating a loop
 
832
        super(TestHTTPRedirectionLoop, self).setUp()
 
833
        # The redirections will point to the new server
 
834
        self.new_server = self.get_readonly_server()
 
835
        # The requests to the old server will be redirected
 
836
        self.old_server = self.get_secondary_server()
 
837
        # Configure the redirections
 
838
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
839
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
840
 
 
841
    def _qualified_url(self, host, port):
 
842
        return 'http+%s://%s:%s' % (self._qualifier, host, port)
 
843
 
 
844
    def test_loop(self):
 
845
        # Starting from either server should loop
 
846
        old_url = self._qualified_url(self.old_server.host, 
 
847
                                      self.old_server.port)
 
848
        oldt = self._transport(old_url)
 
849
        self.assertRaises(errors.NotBranchError,
 
850
                          bzrdir.BzrDir.open_from_transport, oldt)
 
851
        new_url = self._qualified_url(self.new_server.host, 
 
852
                                      self.new_server.port)
 
853
        newt = self._transport(new_url)
 
854
        self.assertRaises(errors.NotBranchError,
 
855
                          bzrdir.BzrDir.open_from_transport, newt)
 
856
 
 
857
 
 
858
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
 
859
                                  TestCaseWithTwoWebservers):
 
860
    """Tests redirections for urllib implementation"""
 
861
 
 
862
    _qualifier = 'urllib'
 
863
    _transport = HttpTransport_urllib
 
864
 
 
865
 
 
866
 
 
867
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
 
868
                                  TestHTTPRedirectionLoop,
 
869
                                  TestCaseWithTwoWebservers):
 
870
    """Tests redirections for pycurl implementation"""
 
871
 
 
872
    _qualifier = 'pycurl'