~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/repository_implementations/test_repository.py

  • Committer: Martin Pool
  • Date: 2006-02-22 04:29:54 UTC
  • mfrom: (1566 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1569.
  • Revision ID: mbp@sourcefrog.net-20060222042954-60333f08dd56a646
[merge] from bzr.dev before integration
Fix undefined ordering in sign_my_revisions breaking tests

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# (C) 2005, 2006 Canonical Ltd
 
2
 
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
 
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
 
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for bzrdir implementations - tests a bzrdir format."""
 
18
 
 
19
import os
 
20
import sys
 
21
 
 
22
import bzrlib
 
23
import bzrlib.bzrdir as bzrdir
 
24
from bzrlib.branch import Branch, needs_read_lock, needs_write_lock
 
25
from bzrlib.commit import commit
 
26
import bzrlib.errors as errors
 
27
from bzrlib.errors import (FileExists,
 
28
                           NoSuchRevision,
 
29
                           NoSuchFile,
 
30
                           UninitializableFormat,
 
31
                           NotBranchError,
 
32
                           )
 
33
import bzrlib.repository as repository
 
34
from bzrlib.revision import NULL_REVISION
 
35
from bzrlib.tests import TestCase, TestCaseWithTransport, TestSkipped
 
36
from bzrlib.tests.bzrdir_implementations.test_bzrdir import TestCaseWithBzrDir
 
37
from bzrlib.trace import mutter
 
38
import bzrlib.transactions as transactions
 
39
from bzrlib.transport import get_transport
 
40
from bzrlib.upgrade import upgrade
 
41
from bzrlib.workingtree import WorkingTree
 
42
 
 
43
 
 
44
class TestCaseWithRepository(TestCaseWithBzrDir):
 
45
 
 
46
    def setUp(self):
 
47
        super(TestCaseWithRepository, self).setUp()
 
48
 
 
49
    def make_branch(self, relpath):
 
50
        repo = self.make_repository(relpath)
 
51
        return repo.bzrdir.create_branch()
 
52
 
 
53
    def make_bzrdir(self, relpath):
 
54
        try:
 
55
            url = self.get_url(relpath)
 
56
            segments = url.split('/')
 
57
            if segments and segments[-1] not in ('', '.'):
 
58
                parent = '/'.join(segments[:-1])
 
59
                t = get_transport(parent)
 
60
                try:
 
61
                    t.mkdir(segments[-1])
 
62
                except FileExists:
 
63
                    pass
 
64
            return self.bzrdir_format.initialize(url)
 
65
        except UninitializableFormat:
 
66
            raise TestSkipped("Format %s is not initializable.")
 
67
 
 
68
    def make_repository(self, relpath):
 
69
        made_control = self.make_bzrdir(relpath)
 
70
        return self.repository_format.initialize(made_control)
 
71
 
 
72
 
 
73
class TestRepository(TestCaseWithRepository):
 
74
 
 
75
    def test_clone_to_default_format(self):
 
76
        #TODO: Test that cloning a repository preserves all the information
 
77
        # such as signatures[not tested yet] etc etc.
 
78
        # when changing to the current default format.
 
79
        tree_a = self.make_branch_and_tree('a')
 
80
        self.build_tree(['a/foo'])
 
81
        tree_a.add('foo', 'file1')
 
82
        tree_a.commit('rev1', rev_id='rev1')
 
83
        bzrdirb = self.make_bzrdir('b')
 
84
        repo_b = tree_a.branch.repository.clone(bzrdirb)
 
85
        tree_b = repo_b.revision_tree('rev1')
 
86
        tree_b.get_file_text('file1')
 
87
        rev1 = repo_b.get_revision('rev1')
 
88
 
 
89
    def test_clone_specific_format(self):
 
90
        """todo"""
 
91
 
 
92
    def test_format_initialize_find_open(self):
 
93
        # loopback test to check the current format initializes to itself.
 
94
        if not self.repository_format.is_supported():
 
95
            # unsupported formats are not loopback testable
 
96
            # because the default open will not open them and
 
97
            # they may not be initializable.
 
98
            return
 
99
        # supported formats must be able to init and open
 
100
        t = get_transport(self.get_url())
 
101
        readonly_t = get_transport(self.get_readonly_url())
 
102
        made_control = self.bzrdir_format.initialize(t.base)
 
103
        made_repo = self.repository_format.initialize(made_control)
 
104
        self.failUnless(isinstance(made_repo, repository.Repository))
 
105
        self.assertEqual(made_control, made_repo.bzrdir)
 
106
 
 
107
        # find it via bzrdir opening:
 
108
        opened_control = bzrdir.BzrDir.open(readonly_t.base)
 
109
        direct_opened_repo = opened_control.open_repository()
 
110
        self.assertEqual(direct_opened_repo.__class__, made_repo.__class__)
 
111
        self.assertEqual(opened_control, direct_opened_repo.bzrdir)
 
112
 
 
113
        self.failUnless(isinstance(direct_opened_repo._format,
 
114
                        self.repository_format.__class__))
 
115
        # find it via Repository.open
 
116
        opened_repo = repository.Repository.open(readonly_t.base)
 
117
        self.failUnless(isinstance(opened_repo, made_repo.__class__))
 
118
        self.assertEqual(made_repo._format.__class__,
 
119
                         opened_repo._format.__class__)
 
120
        # if it has a unique id string, can we probe for it ?
 
121
        try:
 
122
            self.repository_format.get_format_string()
 
123
        except NotImplementedError:
 
124
            return
 
125
        self.assertEqual(self.repository_format,
 
126
                         repository.RepositoryFormat.find_format(opened_control))
 
127
 
 
128
    def test_create_repository(self):
 
129
        # bzrdir can construct a repository for itself.
 
130
        if not self.bzrdir_format.is_supported():
 
131
            # unsupported formats are not loopback testable
 
132
            # because the default open will not open them and
 
133
            # they may not be initializable.
 
134
            return
 
135
        t = get_transport(self.get_url())
 
136
        made_control = self.bzrdir_format.initialize(t.base)
 
137
        made_repo = made_control.create_repository()
 
138
        self.failUnless(isinstance(made_repo, repository.Repository))
 
139
        self.assertEqual(made_control, made_repo.bzrdir)
 
140
        
 
141
    def test_create_repository_shared(self):
 
142
        # bzrdir can construct a shared repository.
 
143
        if not self.bzrdir_format.is_supported():
 
144
            # unsupported formats are not loopback testable
 
145
            # because the default open will not open them and
 
146
            # they may not be initializable.
 
147
            return
 
148
        t = get_transport(self.get_url())
 
149
        made_control = self.bzrdir_format.initialize(t.base)
 
150
        try:
 
151
            made_repo = made_control.create_repository(shared=True)
 
152
        except errors.IncompatibleFormat:
 
153
            # not all repository formats understand being shared, or
 
154
            # may only be shared in some circumstances.
 
155
            return
 
156
        self.failUnless(isinstance(made_repo, repository.Repository))
 
157
        self.assertEqual(made_control, made_repo.bzrdir)
 
158
        self.assertTrue(made_repo.is_shared())
 
159
 
 
160
    def test_revision_tree(self):
 
161
        wt = self.make_branch_and_tree('.')
 
162
        wt.commit('lala!', rev_id='revision-1', allow_pointless=True)
 
163
        tree = wt.branch.repository.revision_tree('revision-1')
 
164
        self.assertEqual(list(tree.list_files()), [])
 
165
        tree = wt.branch.repository.revision_tree(None)
 
166
        self.assertEqual(len(tree.list_files()), 0)
 
167
        tree = wt.branch.repository.revision_tree(NULL_REVISION)
 
168
        self.assertEqual(len(tree.list_files()), 0)
 
169
 
 
170
    def test_fetch(self):
 
171
        # smoke test fetch to ensure that the convenience function works.
 
172
        # it is defined as a convenience function with the underlying 
 
173
        # functionality provided by an InterRepository
 
174
        tree_a = self.make_branch_and_tree('a')
 
175
        self.build_tree(['a/foo'])
 
176
        tree_a.add('foo', 'file1')
 
177
        tree_a.commit('rev1', rev_id='rev1')
 
178
        # fetch with a default limit (grab everything)
 
179
        repo = bzrdir.BzrDir.create_repository(self.get_url('b'))
 
180
        repo.fetch(tree_a.branch.repository,
 
181
                   revision_id=None,
 
182
                   pb=bzrlib.progress.DummyProgress())
 
183
 
 
184
    def test_clone_bzrdir_repository_revision(self):
 
185
        # make a repository with some revisions,
 
186
        # and clone it, this should not have unreferenced revisions.
 
187
        # also: test cloning with a revision id of NULL_REVISION -> empty repo.
 
188
        raise TestSkipped('revision limiting is not implemented yet.')
 
189
 
 
190
    def test_clone_repository_basis_revision(self):
 
191
        raise TestSkipped('the use of a basis should not add noise data to the result.')
 
192
 
 
193
    def test_clone_repository_incomplete_source_with_basis(self):
 
194
        # ensure that basis really does grab from the basis by having incomplete source
 
195
        tree = self.make_branch_and_tree('commit_tree')
 
196
        self.build_tree(['foo'], transport=tree.bzrdir.transport.clone('..'))
 
197
        tree.add('foo')
 
198
        tree.commit('revision 1', rev_id='1')
 
199
        source = self.make_repository('source')
 
200
        # this gives us an incomplete repository
 
201
        tree.bzrdir.open_repository().copy_content_into(source)
 
202
        tree.commit('revision 2', rev_id='2', allow_pointless=True)
 
203
        self.assertFalse(source.has_revision('2'))
 
204
        target = source.bzrdir.clone(self.get_url('target'), basis=tree.bzrdir)
 
205
        self.assertTrue(target.open_repository().has_revision('2'))
 
206
 
 
207
    def test_clone_shared_no_tree(self):
 
208
        # cloning a shared repository keeps it shared
 
209
        # and preserves the make_working_tree setting.
 
210
        made_control = self.make_bzrdir('source')
 
211
        try:
 
212
            made_repo = made_control.create_repository(shared=True)
 
213
        except errors.IncompatibleFormat:
 
214
            # not all repository formats understand being shared, or
 
215
            # may only be shared in some circumstances.
 
216
            return
 
217
        made_repo.set_make_working_trees(False)
 
218
        result = made_control.clone(self.get_url('target'))
 
219
        self.failUnless(isinstance(made_repo, repository.Repository))
 
220
        self.assertEqual(made_control, made_repo.bzrdir)
 
221
        self.assertTrue(result.open_repository().is_shared())
 
222
        self.assertFalse(result.open_repository().make_working_trees())
 
223
 
 
224
 
 
225
class TestCaseWithComplexRepository(TestCaseWithRepository):
 
226
 
 
227
    def setUp(self):
 
228
        super(TestCaseWithComplexRepository, self).setUp()
 
229
        tree_a = self.make_branch_and_tree('a')
 
230
        self.bzrdir = tree_a.branch.bzrdir
 
231
        # add a corrupt inventory 'orphan'
 
232
        # this may need some generalising for knits.
 
233
        tree_a.branch.repository.control_weaves.add_text(
 
234
            'inventory', 'orphan', [], [],
 
235
            tree_a.branch.repository.get_transaction())
 
236
        # add a real revision 'rev1'
 
237
        tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
 
238
        # add a real revision 'rev2' based on rev1
 
239
        tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
 
240
 
 
241
    def test_all_revision_ids(self):
 
242
        # all_revision_ids -> all revisions
 
243
        self.assertEqual(['rev1', 'rev2'],
 
244
                         self.bzrdir.open_repository().all_revision_ids())
 
245
 
 
246
    def test_get_ancestry_missing_revision(self):
 
247
        # get_ancestry(revision that is in some data but not fully installed
 
248
        # -> NoSuchRevision
 
249
        self.assertRaises(errors.NoSuchRevision,
 
250
                          self.bzrdir.open_repository().get_ancestry, 'orphan')