~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_bzrdir.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-06-20 01:09:18 UTC
  • mfrom: (3505.1.1 ianc-integration)
  • Revision ID: pqm@pqm.ubuntu.com-20080620010918-64z4xylh1ap5hgyf
Accept user names with @s in URLs (Neil Martinsen-Burrell)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
2
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for the BzrDir facility and any format specific tests.
 
18
 
 
19
For interface contract tests, see tests/bzr_dir_implementations.
 
20
"""
 
21
 
 
22
import os.path
 
23
from StringIO import StringIO
 
24
import subprocess
 
25
import sys
 
26
 
 
27
from bzrlib import (
 
28
    bzrdir,
 
29
    errors,
 
30
    help_topics,
 
31
    repository,
 
32
    symbol_versioning,
 
33
    urlutils,
 
34
    win32utils,
 
35
    workingtree,
 
36
    )
 
37
import bzrlib.branch
 
38
from bzrlib.errors import (NotBranchError,
 
39
                           UnknownFormatError,
 
40
                           UnsupportedFormatError,
 
41
                           )
 
42
from bzrlib.tests import (
 
43
    TestCase,
 
44
    TestCaseWithTransport,
 
45
    TestSkipped,
 
46
    test_sftp_transport
 
47
    )
 
48
from bzrlib.tests.http_server import HttpServer
 
49
from bzrlib.tests.http_utils import (
 
50
    TestCaseWithTwoWebservers,
 
51
    HTTPServerRedirecting,
 
52
    )
 
53
from bzrlib.tests.test_http import TestWithTransport_pycurl
 
54
from bzrlib.transport import get_transport
 
55
from bzrlib.transport.http._urllib import HttpTransport_urllib
 
56
from bzrlib.transport.memory import MemoryServer
 
57
from bzrlib.repofmt import knitrepo, weaverepo
 
58
 
 
59
 
 
60
class TestDefaultFormat(TestCase):
 
61
 
 
62
    def test_get_set_default_format(self):
 
63
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
64
        # default is BzrDirFormat6
 
65
        self.failUnless(isinstance(old_format, bzrdir.BzrDirMetaFormat1))
 
66
        bzrdir.BzrDirFormat._set_default_format(SampleBzrDirFormat())
 
67
        # creating a bzr dir should now create an instrumented dir.
 
68
        try:
 
69
            result = bzrdir.BzrDir.create('memory:///')
 
70
            self.failUnless(isinstance(result, SampleBzrDir))
 
71
        finally:
 
72
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
73
        self.assertEqual(old_format, bzrdir.BzrDirFormat.get_default_format())
 
74
 
 
75
 
 
76
class TestFormatRegistry(TestCase):
 
77
 
 
78
    def make_format_registry(self):
 
79
        my_format_registry = bzrdir.BzrDirFormatRegistry()
 
80
        my_format_registry.register('weave', bzrdir.BzrDirFormat6,
 
81
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
82
            ' repositories', deprecated=True)
 
83
        my_format_registry.register_lazy('lazy', 'bzrlib.bzrdir', 
 
84
            'BzrDirFormat6', 'Format registered lazily', deprecated=True)
 
85
        my_format_registry.register_metadir('knit',
 
86
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit1',
 
87
            'Format using knits',
 
88
            )
 
89
        my_format_registry.set_default('knit')
 
90
        my_format_registry.register_metadir(
 
91
            'branch6',
 
92
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
93
            'Experimental successor to knit.  Use at your own risk.',
 
94
            branch_format='bzrlib.branch.BzrBranchFormat6',
 
95
            experimental=True)
 
96
        my_format_registry.register_metadir(
 
97
            'hidden format',
 
98
            'bzrlib.repofmt.knitrepo.RepositoryFormatKnit3',
 
99
            'Experimental successor to knit.  Use at your own risk.',
 
100
            branch_format='bzrlib.branch.BzrBranchFormat6', hidden=True)
 
101
        my_format_registry.register('hiddenweave', bzrdir.BzrDirFormat6,
 
102
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
103
            ' repositories', hidden=True)
 
104
        my_format_registry.register_lazy('hiddenlazy', 'bzrlib.bzrdir',
 
105
            'BzrDirFormat6', 'Format registered lazily', deprecated=True,
 
106
            hidden=True)
 
107
        return my_format_registry
 
108
 
 
109
    def test_format_registry(self):
 
110
        my_format_registry = self.make_format_registry()
 
111
        my_bzrdir = my_format_registry.make_bzrdir('lazy')
 
112
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
113
        my_bzrdir = my_format_registry.make_bzrdir('weave')
 
114
        self.assertIsInstance(my_bzrdir, bzrdir.BzrDirFormat6)
 
115
        my_bzrdir = my_format_registry.make_bzrdir('default')
 
116
        self.assertIsInstance(my_bzrdir.repository_format, 
 
117
            knitrepo.RepositoryFormatKnit1)
 
118
        my_bzrdir = my_format_registry.make_bzrdir('knit')
 
119
        self.assertIsInstance(my_bzrdir.repository_format, 
 
120
            knitrepo.RepositoryFormatKnit1)
 
121
        my_bzrdir = my_format_registry.make_bzrdir('branch6')
 
122
        self.assertIsInstance(my_bzrdir.get_branch_format(),
 
123
                              bzrlib.branch.BzrBranchFormat6)
 
124
 
 
125
    def test_get_help(self):
 
126
        my_format_registry = self.make_format_registry()
 
127
        self.assertEqual('Format registered lazily',
 
128
                         my_format_registry.get_help('lazy'))
 
129
        self.assertEqual('Format using knits', 
 
130
                         my_format_registry.get_help('knit'))
 
131
        self.assertEqual('Format using knits', 
 
132
                         my_format_registry.get_help('default'))
 
133
        self.assertEqual('Pre-0.8 format.  Slower and does not support'
 
134
                         ' checkouts or shared repositories', 
 
135
                         my_format_registry.get_help('weave'))
 
136
        
 
137
    def test_help_topic(self):
 
138
        topics = help_topics.HelpTopicRegistry()
 
139
        topics.register('formats', self.make_format_registry().help_topic, 
 
140
                        'Directory formats')
 
141
        topic = topics.get_detail('formats')
 
142
        new, rest = topic.split('Experimental formats')
 
143
        experimental, deprecated = rest.split('Deprecated formats')
 
144
        self.assertContainsRe(new, 'These formats can be used')
 
145
        self.assertContainsRe(new, 
 
146
                ':knit:\n    \(native\) \(default\) Format using knits\n')
 
147
        self.assertContainsRe(experimental, 
 
148
                ':branch6:\n    \(native\) Experimental successor to knit')
 
149
        self.assertContainsRe(deprecated, 
 
150
                ':lazy:\n    \(native\) Format registered lazily\n')
 
151
        self.assertNotContainsRe(new, 'hidden')
 
152
 
 
153
    def test_set_default_repository(self):
 
154
        default_factory = bzrdir.format_registry.get('default')
 
155
        old_default = [k for k, v in bzrdir.format_registry.iteritems()
 
156
                       if v == default_factory and k != 'default'][0]
 
157
        bzrdir.format_registry.set_default_repository('dirstate-with-subtree')
 
158
        try:
 
159
            self.assertIs(bzrdir.format_registry.get('dirstate-with-subtree'),
 
160
                          bzrdir.format_registry.get('default'))
 
161
            self.assertIs(
 
162
                repository.RepositoryFormat.get_default_format().__class__,
 
163
                knitrepo.RepositoryFormatKnit3)
 
164
        finally:
 
165
            bzrdir.format_registry.set_default_repository(old_default)
 
166
 
 
167
    def test_aliases(self):
 
168
        a_registry = bzrdir.BzrDirFormatRegistry()
 
169
        a_registry.register('weave', bzrdir.BzrDirFormat6,
 
170
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
171
            ' repositories', deprecated=True)
 
172
        a_registry.register('weavealias', bzrdir.BzrDirFormat6,
 
173
            'Pre-0.8 format.  Slower and does not support checkouts or shared'
 
174
            ' repositories', deprecated=True, alias=True)
 
175
        self.assertEqual(frozenset(['weavealias']), a_registry.aliases())
 
176
    
 
177
 
 
178
class SampleBranch(bzrlib.branch.Branch):
 
179
    """A dummy branch for guess what, dummy use."""
 
180
 
 
181
    def __init__(self, dir):
 
182
        self.bzrdir = dir
 
183
 
 
184
 
 
185
class SampleBzrDir(bzrdir.BzrDir):
 
186
    """A sample BzrDir implementation to allow testing static methods."""
 
187
 
 
188
    def create_repository(self, shared=False):
 
189
        """See BzrDir.create_repository."""
 
190
        return "A repository"
 
191
 
 
192
    def open_repository(self):
 
193
        """See BzrDir.open_repository."""
 
194
        return "A repository"
 
195
 
 
196
    def create_branch(self):
 
197
        """See BzrDir.create_branch."""
 
198
        return SampleBranch(self)
 
199
 
 
200
    def create_workingtree(self):
 
201
        """See BzrDir.create_workingtree."""
 
202
        return "A tree"
 
203
 
 
204
 
 
205
class SampleBzrDirFormat(bzrdir.BzrDirFormat):
 
206
    """A sample format
 
207
 
 
208
    this format is initializable, unsupported to aid in testing the 
 
209
    open and open_downlevel routines.
 
210
    """
 
211
 
 
212
    def get_format_string(self):
 
213
        """See BzrDirFormat.get_format_string()."""
 
214
        return "Sample .bzr dir format."
 
215
 
 
216
    def initialize_on_transport(self, t):
 
217
        """Create a bzr dir."""
 
218
        t.mkdir('.bzr')
 
219
        t.put_bytes('.bzr/branch-format', self.get_format_string())
 
220
        return SampleBzrDir(t, self)
 
221
 
 
222
    def is_supported(self):
 
223
        return False
 
224
 
 
225
    def open(self, transport, _found=None):
 
226
        return "opened branch."
 
227
 
 
228
 
 
229
class TestBzrDirFormat(TestCaseWithTransport):
 
230
    """Tests for the BzrDirFormat facility."""
 
231
 
 
232
    def test_find_format(self):
 
233
        # is the right format object found for a branch?
 
234
        # create a branch with a few known format objects.
 
235
        # this is not quite the same as 
 
236
        t = get_transport(self.get_url())
 
237
        self.build_tree(["foo/", "bar/"], transport=t)
 
238
        def check_format(format, url):
 
239
            format.initialize(url)
 
240
            t = get_transport(url)
 
241
            found_format = bzrdir.BzrDirFormat.find_format(t)
 
242
            self.failUnless(isinstance(found_format, format.__class__))
 
243
        check_format(bzrdir.BzrDirFormat5(), "foo")
 
244
        check_format(bzrdir.BzrDirFormat6(), "bar")
 
245
        
 
246
    def test_find_format_nothing_there(self):
 
247
        self.assertRaises(NotBranchError,
 
248
                          bzrdir.BzrDirFormat.find_format,
 
249
                          get_transport('.'))
 
250
 
 
251
    def test_find_format_unknown_format(self):
 
252
        t = get_transport(self.get_url())
 
253
        t.mkdir('.bzr')
 
254
        t.put_bytes('.bzr/branch-format', '')
 
255
        self.assertRaises(UnknownFormatError,
 
256
                          bzrdir.BzrDirFormat.find_format,
 
257
                          get_transport('.'))
 
258
 
 
259
    def test_register_unregister_format(self):
 
260
        format = SampleBzrDirFormat()
 
261
        url = self.get_url()
 
262
        # make a bzrdir
 
263
        format.initialize(url)
 
264
        # register a format for it.
 
265
        bzrdir.BzrDirFormat.register_format(format)
 
266
        # which bzrdir.Open will refuse (not supported)
 
267
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open, url)
 
268
        # which bzrdir.open_containing will refuse (not supported)
 
269
        self.assertRaises(UnsupportedFormatError, bzrdir.BzrDir.open_containing, url)
 
270
        # but open_downlevel will work
 
271
        t = get_transport(url)
 
272
        self.assertEqual(format.open(t), bzrdir.BzrDir.open_unsupported(url))
 
273
        # unregister the format
 
274
        bzrdir.BzrDirFormat.unregister_format(format)
 
275
        # now open_downlevel should fail too.
 
276
        self.assertRaises(UnknownFormatError, bzrdir.BzrDir.open_unsupported, url)
 
277
 
 
278
    def test_create_branch_and_repo_uses_default(self):
 
279
        format = SampleBzrDirFormat()
 
280
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url(),
 
281
                                                      format=format)
 
282
        self.assertTrue(isinstance(branch, SampleBranch))
 
283
 
 
284
    def test_create_branch_and_repo_under_shared(self):
 
285
        # creating a branch and repo in a shared repo uses the
 
286
        # shared repository
 
287
        format = bzrdir.format_registry.make_bzrdir('knit')
 
288
        self.make_repository('.', shared=True, format=format)
 
289
        branch = bzrdir.BzrDir.create_branch_and_repo(
 
290
            self.get_url('child'), format=format)
 
291
        self.assertRaises(errors.NoRepositoryPresent,
 
292
                          branch.bzrdir.open_repository)
 
293
 
 
294
    def test_create_branch_and_repo_under_shared_force_new(self):
 
295
        # creating a branch and repo in a shared repo can be forced to 
 
296
        # make a new repo
 
297
        format = bzrdir.format_registry.make_bzrdir('knit')
 
298
        self.make_repository('.', shared=True, format=format)
 
299
        branch = bzrdir.BzrDir.create_branch_and_repo(self.get_url('child'),
 
300
                                                      force_new_repo=True,
 
301
                                                      format=format)
 
302
        branch.bzrdir.open_repository()
 
303
 
 
304
    def test_create_standalone_working_tree(self):
 
305
        format = SampleBzrDirFormat()
 
306
        # note this is deliberately readonly, as this failure should 
 
307
        # occur before any writes.
 
308
        self.assertRaises(errors.NotLocalUrl,
 
309
                          bzrdir.BzrDir.create_standalone_workingtree,
 
310
                          self.get_readonly_url(), format=format)
 
311
        tree = bzrdir.BzrDir.create_standalone_workingtree('.', 
 
312
                                                           format=format)
 
313
        self.assertEqual('A tree', tree)
 
314
 
 
315
    def test_create_standalone_working_tree_under_shared_repo(self):
 
316
        # create standalone working tree always makes a repo.
 
317
        format = bzrdir.format_registry.make_bzrdir('knit')
 
318
        self.make_repository('.', shared=True, format=format)
 
319
        # note this is deliberately readonly, as this failure should 
 
320
        # occur before any writes.
 
321
        self.assertRaises(errors.NotLocalUrl,
 
322
                          bzrdir.BzrDir.create_standalone_workingtree,
 
323
                          self.get_readonly_url('child'), format=format)
 
324
        tree = bzrdir.BzrDir.create_standalone_workingtree('child', 
 
325
            format=format)
 
326
        tree.bzrdir.open_repository()
 
327
 
 
328
    def test_create_branch_convenience(self):
 
329
        # outside a repo the default convenience output is a repo+branch_tree
 
330
        format = bzrdir.format_registry.make_bzrdir('knit')
 
331
        branch = bzrdir.BzrDir.create_branch_convenience('.', format=format)
 
332
        branch.bzrdir.open_workingtree()
 
333
        branch.bzrdir.open_repository()
 
334
 
 
335
    def test_create_branch_convenience_possible_transports(self):
 
336
        """Check that the optional 'possible_transports' is recognized"""
 
337
        format = bzrdir.format_registry.make_bzrdir('knit')
 
338
        t = self.get_transport()
 
339
        branch = bzrdir.BzrDir.create_branch_convenience(
 
340
            '.', format=format, possible_transports=[t])
 
341
        branch.bzrdir.open_workingtree()
 
342
        branch.bzrdir.open_repository()
 
343
 
 
344
    def test_create_branch_convenience_root(self):
 
345
        """Creating a branch at the root of a fs should work."""
 
346
        self.vfs_transport_factory = MemoryServer
 
347
        # outside a repo the default convenience output is a repo+branch_tree
 
348
        format = bzrdir.format_registry.make_bzrdir('knit')
 
349
        branch = bzrdir.BzrDir.create_branch_convenience(self.get_url(), 
 
350
                                                         format=format)
 
351
        self.assertRaises(errors.NoWorkingTree,
 
352
                          branch.bzrdir.open_workingtree)
 
353
        branch.bzrdir.open_repository()
 
354
 
 
355
    def test_create_branch_convenience_under_shared_repo(self):
 
356
        # inside a repo the default convenience output is a branch+ follow the
 
357
        # repo tree policy
 
358
        format = bzrdir.format_registry.make_bzrdir('knit')
 
359
        self.make_repository('.', shared=True, format=format)
 
360
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
361
            format=format)
 
362
        branch.bzrdir.open_workingtree()
 
363
        self.assertRaises(errors.NoRepositoryPresent,
 
364
                          branch.bzrdir.open_repository)
 
365
            
 
366
    def test_create_branch_convenience_under_shared_repo_force_no_tree(self):
 
367
        # inside a repo the default convenience output is a branch+ follow the
 
368
        # repo tree policy but we can override that
 
369
        format = bzrdir.format_registry.make_bzrdir('knit')
 
370
        self.make_repository('.', shared=True, format=format)
 
371
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
372
            force_new_tree=False, format=format)
 
373
        self.assertRaises(errors.NoWorkingTree,
 
374
                          branch.bzrdir.open_workingtree)
 
375
        self.assertRaises(errors.NoRepositoryPresent,
 
376
                          branch.bzrdir.open_repository)
 
377
            
 
378
    def test_create_branch_convenience_under_shared_repo_no_tree_policy(self):
 
379
        # inside a repo the default convenience output is a branch+ follow the
 
380
        # repo tree policy
 
381
        format = bzrdir.format_registry.make_bzrdir('knit')
 
382
        repo = self.make_repository('.', shared=True, format=format)
 
383
        repo.set_make_working_trees(False)
 
384
        branch = bzrdir.BzrDir.create_branch_convenience('child', 
 
385
                                                         format=format)
 
386
        self.assertRaises(errors.NoWorkingTree,
 
387
                          branch.bzrdir.open_workingtree)
 
388
        self.assertRaises(errors.NoRepositoryPresent,
 
389
                          branch.bzrdir.open_repository)
 
390
 
 
391
    def test_create_branch_convenience_under_shared_repo_no_tree_policy_force_tree(self):
 
392
        # inside a repo the default convenience output is a branch+ follow the
 
393
        # repo tree policy but we can override that
 
394
        format = bzrdir.format_registry.make_bzrdir('knit')
 
395
        repo = self.make_repository('.', shared=True, format=format)
 
396
        repo.set_make_working_trees(False)
 
397
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
398
            force_new_tree=True, format=format)
 
399
        branch.bzrdir.open_workingtree()
 
400
        self.assertRaises(errors.NoRepositoryPresent,
 
401
                          branch.bzrdir.open_repository)
 
402
 
 
403
    def test_create_branch_convenience_under_shared_repo_force_new_repo(self):
 
404
        # inside a repo the default convenience output is overridable to give
 
405
        # repo+branch+tree
 
406
        format = bzrdir.format_registry.make_bzrdir('knit')
 
407
        self.make_repository('.', shared=True, format=format)
 
408
        branch = bzrdir.BzrDir.create_branch_convenience('child',
 
409
            force_new_repo=True, format=format)
 
410
        branch.bzrdir.open_repository()
 
411
        branch.bzrdir.open_workingtree()
 
412
 
 
413
 
 
414
class TestRepositoryAcquisitionPolicy(TestCaseWithTransport):
 
415
 
 
416
    def test_acquire_repository_standalone(self):
 
417
        """The default acquisition policy should create a standalone branch."""
 
418
        my_bzrdir = self.make_bzrdir('.')
 
419
        repo_policy = my_bzrdir.determine_repository_policy()
 
420
        repo = repo_policy.acquire_repository()
 
421
        self.assertEqual(repo.bzrdir.root_transport.base,
 
422
                         my_bzrdir.root_transport.base)
 
423
        self.assertFalse(repo.is_shared())
 
424
 
 
425
 
 
426
class ChrootedTests(TestCaseWithTransport):
 
427
    """A support class that provides readonly urls outside the local namespace.
 
428
 
 
429
    This is done by checking if self.transport_server is a MemoryServer. if it
 
430
    is then we are chrooted already, if it is not then an HttpServer is used
 
431
    for readonly urls.
 
432
    """
 
433
 
 
434
    def setUp(self):
 
435
        super(ChrootedTests, self).setUp()
 
436
        if not self.vfs_transport_factory == MemoryServer:
 
437
            self.transport_readonly_server = HttpServer
 
438
 
 
439
    def test_open_containing(self):
 
440
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
441
                          self.get_readonly_url(''))
 
442
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing,
 
443
                          self.get_readonly_url('g/p/q'))
 
444
        control = bzrdir.BzrDir.create(self.get_url())
 
445
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url(''))
 
446
        self.assertEqual('', relpath)
 
447
        branch, relpath = bzrdir.BzrDir.open_containing(self.get_readonly_url('g/p/q'))
 
448
        self.assertEqual('g/p/q', relpath)
 
449
 
 
450
    def test_open_containing_from_transport(self):
 
451
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
452
                          get_transport(self.get_readonly_url('')))
 
453
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_containing_from_transport,
 
454
                          get_transport(self.get_readonly_url('g/p/q')))
 
455
        control = bzrdir.BzrDir.create(self.get_url())
 
456
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
457
            get_transport(self.get_readonly_url('')))
 
458
        self.assertEqual('', relpath)
 
459
        branch, relpath = bzrdir.BzrDir.open_containing_from_transport(
 
460
            get_transport(self.get_readonly_url('g/p/q')))
 
461
        self.assertEqual('g/p/q', relpath)
 
462
 
 
463
    def test_open_containing_tree_or_branch(self):
 
464
        def local_branch_path(branch):
 
465
             return os.path.realpath(
 
466
                urlutils.local_path_from_url(branch.base))
 
467
 
 
468
        self.make_branch_and_tree('topdir')
 
469
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
470
            'topdir/foo')
 
471
        self.assertEqual(os.path.realpath('topdir'),
 
472
                         os.path.realpath(tree.basedir))
 
473
        self.assertEqual(os.path.realpath('topdir'),
 
474
                         local_branch_path(branch))
 
475
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
476
        self.assertEqual('foo', relpath)
 
477
        # opening from non-local should not return the tree
 
478
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
479
            self.get_readonly_url('topdir/foo'))
 
480
        self.assertEqual(None, tree)
 
481
        self.assertEqual('foo', relpath)
 
482
        # without a tree:
 
483
        self.make_branch('topdir/foo')
 
484
        tree, branch, relpath = bzrdir.BzrDir.open_containing_tree_or_branch(
 
485
            'topdir/foo')
 
486
        self.assertIs(tree, None)
 
487
        self.assertEqual(os.path.realpath('topdir/foo'),
 
488
                         local_branch_path(branch))
 
489
        self.assertEqual('', relpath)
 
490
 
 
491
    def test_open_tree_or_branch(self):
 
492
        def local_branch_path(branch):
 
493
             return os.path.realpath(
 
494
                urlutils.local_path_from_url(branch.base))
 
495
 
 
496
        self.make_branch_and_tree('topdir')
 
497
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir')
 
498
        self.assertEqual(os.path.realpath('topdir'),
 
499
                         os.path.realpath(tree.basedir))
 
500
        self.assertEqual(os.path.realpath('topdir'),
 
501
                         local_branch_path(branch))
 
502
        self.assertIs(tree.bzrdir, branch.bzrdir)
 
503
        # opening from non-local should not return the tree
 
504
        tree, branch = bzrdir.BzrDir.open_tree_or_branch(
 
505
            self.get_readonly_url('topdir'))
 
506
        self.assertEqual(None, tree)
 
507
        # without a tree:
 
508
        self.make_branch('topdir/foo')
 
509
        tree, branch = bzrdir.BzrDir.open_tree_or_branch('topdir/foo')
 
510
        self.assertIs(tree, None)
 
511
        self.assertEqual(os.path.realpath('topdir/foo'),
 
512
                         local_branch_path(branch))
 
513
 
 
514
    def test_open_from_transport(self):
 
515
        # transport pointing at bzrdir should give a bzrdir with root transport
 
516
        # set to the given transport
 
517
        control = bzrdir.BzrDir.create(self.get_url())
 
518
        transport = get_transport(self.get_url())
 
519
        opened_bzrdir = bzrdir.BzrDir.open_from_transport(transport)
 
520
        self.assertEqual(transport.base, opened_bzrdir.root_transport.base)
 
521
        self.assertIsInstance(opened_bzrdir, bzrdir.BzrDir)
 
522
        
 
523
    def test_open_from_transport_no_bzrdir(self):
 
524
        transport = get_transport(self.get_url())
 
525
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
526
                          transport)
 
527
 
 
528
    def test_open_from_transport_bzrdir_in_parent(self):
 
529
        control = bzrdir.BzrDir.create(self.get_url())
 
530
        transport = get_transport(self.get_url())
 
531
        transport.mkdir('subdir')
 
532
        transport = transport.clone('subdir')
 
533
        self.assertRaises(NotBranchError, bzrdir.BzrDir.open_from_transport,
 
534
                          transport)
 
535
 
 
536
    def test_sprout_recursive(self):
 
537
        tree = self.make_branch_and_tree('tree1', format='dirstate-with-subtree')
 
538
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
539
            format='dirstate-with-subtree')
 
540
        tree.add_reference(sub_tree)
 
541
        self.build_tree(['tree1/subtree/file'])
 
542
        sub_tree.add('file')
 
543
        tree.commit('Initial commit')
 
544
        tree.bzrdir.sprout('tree2')
 
545
        self.failUnlessExists('tree2/subtree/file')
 
546
 
 
547
    def test_cloning_metadir(self):
 
548
        """Ensure that cloning metadir is suitable"""
 
549
        bzrdir = self.make_bzrdir('bzrdir')
 
550
        bzrdir.cloning_metadir()
 
551
        branch = self.make_branch('branch', format='knit')
 
552
        format = branch.bzrdir.cloning_metadir()
 
553
        self.assertIsInstance(format.workingtree_format,
 
554
            workingtree.WorkingTreeFormat3)
 
555
 
 
556
    def test_sprout_recursive_treeless(self):
 
557
        tree = self.make_branch_and_tree('tree1',
 
558
            format='dirstate-with-subtree')
 
559
        sub_tree = self.make_branch_and_tree('tree1/subtree',
 
560
            format='dirstate-with-subtree')
 
561
        tree.add_reference(sub_tree)
 
562
        self.build_tree(['tree1/subtree/file'])
 
563
        sub_tree.add('file')
 
564
        tree.commit('Initial commit')
 
565
        tree.bzrdir.destroy_workingtree()
 
566
        repo = self.make_repository('repo', shared=True,
 
567
            format='dirstate-with-subtree')
 
568
        repo.set_make_working_trees(False)
 
569
        tree.bzrdir.sprout('repo/tree2')
 
570
        self.failUnlessExists('repo/tree2/subtree')
 
571
        self.failIfExists('repo/tree2/subtree/file')
 
572
 
 
573
    def make_foo_bar_baz(self):
 
574
        foo = bzrdir.BzrDir.create_branch_convenience('foo').bzrdir
 
575
        bar = self.make_branch('foo/bar').bzrdir
 
576
        baz = self.make_branch('baz').bzrdir
 
577
        return foo, bar, baz
 
578
 
 
579
    def test_find_bzrdirs(self):
 
580
        foo, bar, baz = self.make_foo_bar_baz()
 
581
        transport = get_transport(self.get_url())
 
582
        self.assertEqualBzrdirs([baz, foo, bar],
 
583
                                bzrdir.BzrDir.find_bzrdirs(transport))
 
584
 
 
585
    def test_find_bzrdirs_list_current(self):
 
586
        def list_current(transport):
 
587
            return [s for s in transport.list_dir('') if s != 'baz']
 
588
 
 
589
        foo, bar, baz = self.make_foo_bar_baz()
 
590
        transport = get_transport(self.get_url())
 
591
        self.assertEqualBzrdirs([foo, bar],
 
592
                                bzrdir.BzrDir.find_bzrdirs(transport,
 
593
                                    list_current=list_current))
 
594
 
 
595
 
 
596
    def test_find_bzrdirs_evaluate(self):
 
597
        def evaluate(bzrdir):
 
598
            try:
 
599
                repo = bzrdir.open_repository()
 
600
            except NoRepositoryPresent:
 
601
                return True, bzrdir.root_transport.base
 
602
            else:
 
603
                return False, bzrdir.root_transport.base
 
604
 
 
605
        foo, bar, baz = self.make_foo_bar_baz()
 
606
        transport = get_transport(self.get_url())
 
607
        self.assertEqual([baz.root_transport.base, foo.root_transport.base],
 
608
                         list(bzrdir.BzrDir.find_bzrdirs(transport,
 
609
                                                         evaluate=evaluate)))
 
610
 
 
611
    def assertEqualBzrdirs(self, first, second):
 
612
        first = list(first)
 
613
        second = list(second)
 
614
        self.assertEqual(len(first), len(second))
 
615
        for x, y in zip(first, second):
 
616
            self.assertEqual(x.root_transport.base, y.root_transport.base)
 
617
 
 
618
    def test_find_branches(self):
 
619
        root = self.make_repository('', shared=True)
 
620
        foo, bar, baz = self.make_foo_bar_baz()
 
621
        qux = self.make_bzrdir('foo/qux')
 
622
        transport = get_transport(self.get_url())
 
623
        branches = bzrdir.BzrDir.find_branches(transport)
 
624
        self.assertEqual(baz.root_transport.base, branches[0].base)
 
625
        self.assertEqual(foo.root_transport.base, branches[1].base)
 
626
        self.assertEqual(bar.root_transport.base, branches[2].base)
 
627
 
 
628
        # ensure this works without a top-level repo
 
629
        branches = bzrdir.BzrDir.find_branches(transport.clone('foo'))
 
630
        self.assertEqual(foo.root_transport.base, branches[0].base)
 
631
        self.assertEqual(bar.root_transport.base, branches[1].base)
 
632
 
 
633
 
 
634
class TestMeta1DirFormat(TestCaseWithTransport):
 
635
    """Tests specific to the meta1 dir format."""
 
636
 
 
637
    def test_right_base_dirs(self):
 
638
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
639
        t = dir.transport
 
640
        branch_base = t.clone('branch').base
 
641
        self.assertEqual(branch_base, dir.get_branch_transport(None).base)
 
642
        self.assertEqual(branch_base,
 
643
                         dir.get_branch_transport(bzrlib.branch.BzrBranchFormat5()).base)
 
644
        repository_base = t.clone('repository').base
 
645
        self.assertEqual(repository_base, dir.get_repository_transport(None).base)
 
646
        self.assertEqual(repository_base,
 
647
                         dir.get_repository_transport(weaverepo.RepositoryFormat7()).base)
 
648
        checkout_base = t.clone('checkout').base
 
649
        self.assertEqual(checkout_base, dir.get_workingtree_transport(None).base)
 
650
        self.assertEqual(checkout_base,
 
651
                         dir.get_workingtree_transport(workingtree.WorkingTreeFormat3()).base)
 
652
 
 
653
    def test_meta1dir_uses_lockdir(self):
 
654
        """Meta1 format uses a LockDir to guard the whole directory, not a file."""
 
655
        dir = bzrdir.BzrDirMetaFormat1().initialize(self.get_url())
 
656
        t = dir.transport
 
657
        self.assertIsDirectory('branch-lock', t)
 
658
 
 
659
    def test_comparison(self):
 
660
        """Equality and inequality behave properly.
 
661
 
 
662
        Metadirs should compare equal iff they have the same repo, branch and
 
663
        tree formats.
 
664
        """
 
665
        mydir = bzrdir.format_registry.make_bzrdir('knit')
 
666
        self.assertEqual(mydir, mydir)
 
667
        self.assertFalse(mydir != mydir)
 
668
        otherdir = bzrdir.format_registry.make_bzrdir('knit')
 
669
        self.assertEqual(otherdir, mydir)
 
670
        self.assertFalse(otherdir != mydir)
 
671
        otherdir2 = bzrdir.format_registry.make_bzrdir('dirstate-with-subtree')
 
672
        self.assertNotEqual(otherdir2, mydir)
 
673
        self.assertFalse(otherdir2 == mydir)
 
674
 
 
675
    def test_needs_conversion_different_working_tree(self):
 
676
        # meta1dirs need an conversion if any element is not the default.
 
677
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
678
        # test with 
 
679
        new_default = bzrdir.format_registry.make_bzrdir('dirstate')
 
680
        bzrdir.BzrDirFormat._set_default_format(new_default)
 
681
        try:
 
682
            tree = self.make_branch_and_tree('tree', format='knit')
 
683
            self.assertTrue(tree.bzrdir.needs_format_conversion())
 
684
        finally:
 
685
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
686
 
 
687
 
 
688
class TestFormat5(TestCaseWithTransport):
 
689
    """Tests specific to the version 5 bzrdir format."""
 
690
 
 
691
    def test_same_lockfiles_between_tree_repo_branch(self):
 
692
        # this checks that only a single lockfiles instance is created 
 
693
        # for format 5 objects
 
694
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
695
        def check_dir_components_use_same_lock(dir):
 
696
            ctrl_1 = dir.open_repository().control_files
 
697
            ctrl_2 = dir.open_branch().control_files
 
698
            ctrl_3 = dir.open_workingtree()._control_files
 
699
            self.assertTrue(ctrl_1 is ctrl_2)
 
700
            self.assertTrue(ctrl_2 is ctrl_3)
 
701
        check_dir_components_use_same_lock(dir)
 
702
        # and if we open it normally.
 
703
        dir = bzrdir.BzrDir.open(self.get_url())
 
704
        check_dir_components_use_same_lock(dir)
 
705
    
 
706
    def test_can_convert(self):
 
707
        # format 5 dirs are convertable
 
708
        dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
709
        self.assertTrue(dir.can_convert_format())
 
710
    
 
711
    def test_needs_conversion(self):
 
712
        # format 5 dirs need a conversion if they are not the default.
 
713
        # and they start of not the default.
 
714
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
715
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirFormat5())
 
716
        try:
 
717
            dir = bzrdir.BzrDirFormat5().initialize(self.get_url())
 
718
            self.assertFalse(dir.needs_format_conversion())
 
719
        finally:
 
720
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
721
        self.assertTrue(dir.needs_format_conversion())
 
722
 
 
723
 
 
724
class TestFormat6(TestCaseWithTransport):
 
725
    """Tests specific to the version 6 bzrdir format."""
 
726
 
 
727
    def test_same_lockfiles_between_tree_repo_branch(self):
 
728
        # this checks that only a single lockfiles instance is created 
 
729
        # for format 6 objects
 
730
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
731
        def check_dir_components_use_same_lock(dir):
 
732
            ctrl_1 = dir.open_repository().control_files
 
733
            ctrl_2 = dir.open_branch().control_files
 
734
            ctrl_3 = dir.open_workingtree()._control_files
 
735
            self.assertTrue(ctrl_1 is ctrl_2)
 
736
            self.assertTrue(ctrl_2 is ctrl_3)
 
737
        check_dir_components_use_same_lock(dir)
 
738
        # and if we open it normally.
 
739
        dir = bzrdir.BzrDir.open(self.get_url())
 
740
        check_dir_components_use_same_lock(dir)
 
741
    
 
742
    def test_can_convert(self):
 
743
        # format 6 dirs are convertable
 
744
        dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
745
        self.assertTrue(dir.can_convert_format())
 
746
    
 
747
    def test_needs_conversion(self):
 
748
        # format 6 dirs need an conversion if they are not the default.
 
749
        old_format = bzrdir.BzrDirFormat.get_default_format()
 
750
        bzrdir.BzrDirFormat._set_default_format(bzrdir.BzrDirMetaFormat1())
 
751
        try:
 
752
            dir = bzrdir.BzrDirFormat6().initialize(self.get_url())
 
753
            self.assertTrue(dir.needs_format_conversion())
 
754
        finally:
 
755
            bzrdir.BzrDirFormat._set_default_format(old_format)
 
756
 
 
757
 
 
758
class NotBzrDir(bzrlib.bzrdir.BzrDir):
 
759
    """A non .bzr based control directory."""
 
760
 
 
761
    def __init__(self, transport, format):
 
762
        self._format = format
 
763
        self.root_transport = transport
 
764
        self.transport = transport.clone('.not')
 
765
 
 
766
 
 
767
class NotBzrDirFormat(bzrlib.bzrdir.BzrDirFormat):
 
768
    """A test class representing any non-.bzr based disk format."""
 
769
 
 
770
    def initialize_on_transport(self, transport):
 
771
        """Initialize a new .not dir in the base directory of a Transport."""
 
772
        transport.mkdir('.not')
 
773
        return self.open(transport)
 
774
 
 
775
    def open(self, transport):
 
776
        """Open this directory."""
 
777
        return NotBzrDir(transport, self)
 
778
 
 
779
    @classmethod
 
780
    def _known_formats(self):
 
781
        return set([NotBzrDirFormat()])
 
782
 
 
783
    @classmethod
 
784
    def probe_transport(self, transport):
 
785
        """Our format is present if the transport ends in '.not/'."""
 
786
        if transport.has('.not'):
 
787
            return NotBzrDirFormat()
 
788
 
 
789
 
 
790
class TestNotBzrDir(TestCaseWithTransport):
 
791
    """Tests for using the bzrdir api with a non .bzr based disk format.
 
792
    
 
793
    If/when one of these is in the core, we can let the implementation tests
 
794
    verify this works.
 
795
    """
 
796
 
 
797
    def test_create_and_find_format(self):
 
798
        # create a .notbzr dir 
 
799
        format = NotBzrDirFormat()
 
800
        dir = format.initialize(self.get_url())
 
801
        self.assertIsInstance(dir, NotBzrDir)
 
802
        # now probe for it.
 
803
        bzrlib.bzrdir.BzrDirFormat.register_control_format(format)
 
804
        try:
 
805
            found = bzrlib.bzrdir.BzrDirFormat.find_format(
 
806
                get_transport(self.get_url()))
 
807
            self.assertIsInstance(found, NotBzrDirFormat)
 
808
        finally:
 
809
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(format)
 
810
 
 
811
    def test_included_in_known_formats(self):
 
812
        bzrlib.bzrdir.BzrDirFormat.register_control_format(NotBzrDirFormat)
 
813
        try:
 
814
            formats = bzrlib.bzrdir.BzrDirFormat.known_formats()
 
815
            for format in formats:
 
816
                if isinstance(format, NotBzrDirFormat):
 
817
                    return
 
818
            self.fail("No NotBzrDirFormat in %s" % formats)
 
819
        finally:
 
820
            bzrlib.bzrdir.BzrDirFormat.unregister_control_format(NotBzrDirFormat)
 
821
 
 
822
 
 
823
class NonLocalTests(TestCaseWithTransport):
 
824
    """Tests for bzrdir static behaviour on non local paths."""
 
825
 
 
826
    def setUp(self):
 
827
        super(NonLocalTests, self).setUp()
 
828
        self.vfs_transport_factory = MemoryServer
 
829
    
 
830
    def test_create_branch_convenience(self):
 
831
        # outside a repo the default convenience output is a repo+branch_tree
 
832
        format = bzrdir.format_registry.make_bzrdir('knit')
 
833
        branch = bzrdir.BzrDir.create_branch_convenience(
 
834
            self.get_url('foo'), format=format)
 
835
        self.assertRaises(errors.NoWorkingTree,
 
836
                          branch.bzrdir.open_workingtree)
 
837
        branch.bzrdir.open_repository()
 
838
 
 
839
    def test_create_branch_convenience_force_tree_not_local_fails(self):
 
840
        # outside a repo the default convenience output is a repo+branch_tree
 
841
        format = bzrdir.format_registry.make_bzrdir('knit')
 
842
        self.assertRaises(errors.NotLocalUrl,
 
843
            bzrdir.BzrDir.create_branch_convenience,
 
844
            self.get_url('foo'),
 
845
            force_new_tree=True,
 
846
            format=format)
 
847
        t = get_transport(self.get_url('.'))
 
848
        self.assertFalse(t.has('foo'))
 
849
 
 
850
    def test_clone(self):
 
851
        # clone into a nonlocal path works
 
852
        format = bzrdir.format_registry.make_bzrdir('knit')
 
853
        branch = bzrdir.BzrDir.create_branch_convenience('local',
 
854
                                                         format=format)
 
855
        branch.bzrdir.open_workingtree()
 
856
        result = branch.bzrdir.clone(self.get_url('remote'))
 
857
        self.assertRaises(errors.NoWorkingTree,
 
858
                          result.open_workingtree)
 
859
        result.open_branch()
 
860
        result.open_repository()
 
861
 
 
862
    def test_checkout_metadir(self):
 
863
        # checkout_metadir has reasonable working tree format even when no
 
864
        # working tree is present
 
865
        self.make_branch('branch-knit2', format='dirstate-with-subtree')
 
866
        my_bzrdir = bzrdir.BzrDir.open(self.get_url('branch-knit2'))
 
867
        checkout_format = my_bzrdir.checkout_metadir()
 
868
        self.assertIsInstance(checkout_format.workingtree_format,
 
869
                              workingtree.WorkingTreeFormat3)
 
870
 
 
871
 
 
872
class TestHTTPRedirectionLoop(object):
 
873
    """Test redirection loop between two http servers.
 
874
 
 
875
    This MUST be used by daughter classes that also inherit from
 
876
    TestCaseWithTwoWebservers.
 
877
 
 
878
    We can't inherit directly from TestCaseWithTwoWebservers or the
 
879
    test framework will try to create an instance which cannot
 
880
    run, its implementation being incomplete. 
 
881
    """
 
882
 
 
883
    # Should be defined by daughter classes to ensure redirection
 
884
    # still use the same transport implementation (not currently
 
885
    # enforced as it's a bit tricky to get right (see the FIXME
 
886
    # in BzrDir.open_from_transport for the unique use case so
 
887
    # far)
 
888
    _qualifier = None
 
889
 
 
890
    def create_transport_readonly_server(self):
 
891
        return HTTPServerRedirecting()
 
892
 
 
893
    def create_transport_secondary_server(self):
 
894
        return HTTPServerRedirecting()
 
895
 
 
896
    def setUp(self):
 
897
        # Both servers redirect to each server creating a loop
 
898
        super(TestHTTPRedirectionLoop, self).setUp()
 
899
        # The redirections will point to the new server
 
900
        self.new_server = self.get_readonly_server()
 
901
        # The requests to the old server will be redirected
 
902
        self.old_server = self.get_secondary_server()
 
903
        # Configure the redirections
 
904
        self.old_server.redirect_to(self.new_server.host, self.new_server.port)
 
905
        self.new_server.redirect_to(self.old_server.host, self.old_server.port)
 
906
 
 
907
    def _qualified_url(self, host, port):
 
908
        return 'http+%s://%s:%s' % (self._qualifier, host, port)
 
909
 
 
910
    def test_loop(self):
 
911
        # Starting from either server should loop
 
912
        old_url = self._qualified_url(self.old_server.host, 
 
913
                                      self.old_server.port)
 
914
        oldt = self._transport(old_url)
 
915
        self.assertRaises(errors.NotBranchError,
 
916
                          bzrdir.BzrDir.open_from_transport, oldt)
 
917
        new_url = self._qualified_url(self.new_server.host, 
 
918
                                      self.new_server.port)
 
919
        newt = self._transport(new_url)
 
920
        self.assertRaises(errors.NotBranchError,
 
921
                          bzrdir.BzrDir.open_from_transport, newt)
 
922
 
 
923
 
 
924
class TestHTTPRedirections_urllib(TestHTTPRedirectionLoop,
 
925
                                  TestCaseWithTwoWebservers):
 
926
    """Tests redirections for urllib implementation"""
 
927
 
 
928
    _qualifier = 'urllib'
 
929
    _transport = HttpTransport_urllib
 
930
 
 
931
 
 
932
 
 
933
class TestHTTPRedirections_pycurl(TestWithTransport_pycurl,
 
934
                                  TestHTTPRedirectionLoop,
 
935
                                  TestCaseWithTwoWebservers):
 
936
    """Tests redirections for pycurl implementation"""
 
937
 
 
938
    _qualifier = 'pycurl'
 
939
 
 
940
 
 
941
class TestDotBzrHidden(TestCaseWithTransport):
 
942
 
 
943
    ls = ['ls']
 
944
    if sys.platform == 'win32':
 
945
        ls = [os.environ['COMSPEC'], '/C', 'dir', '/B']
 
946
 
 
947
    def get_ls(self):
 
948
        f = subprocess.Popen(self.ls, stdout=subprocess.PIPE,
 
949
            stderr=subprocess.PIPE)
 
950
        out, err = f.communicate()
 
951
        self.assertEqual(0, f.returncode, 'Calling %s failed: %s'
 
952
                         % (self.ls, err))
 
953
        return out.splitlines()
 
954
 
 
955
    def test_dot_bzr_hidden(self):
 
956
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
957
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
958
        b = bzrdir.BzrDir.create('.')
 
959
        self.build_tree(['a'])
 
960
        self.assertEquals(['a'], self.get_ls())
 
961
 
 
962
    def test_dot_bzr_hidden_with_url(self):
 
963
        if sys.platform == 'win32' and not win32utils.has_win32file:
 
964
            raise TestSkipped('unable to make file hidden without pywin32 library')
 
965
        b = bzrdir.BzrDir.create(urlutils.local_path_to_url('.'))
 
966
        self.build_tree(['a'])
 
967
        self.assertEquals(['a'], self.get_ls())