~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/per_repository_vf/test_repository.py

merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Tests for repository implementations - tests a repository format."""
 
18
 
 
19
from bzrlib import (
 
20
    errors,
 
21
    tests,
 
22
    versionedfile,
 
23
    )
 
24
 
 
25
from bzrlib.tests.per_repository_vf import (
 
26
    TestCaseWithRepository,
 
27
    all_repository_vf_format_scenarios,
 
28
    )
 
29
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
30
 
 
31
 
 
32
load_tests = load_tests_apply_scenarios
 
33
 
 
34
 
 
35
class TestRepository(TestCaseWithRepository):
 
36
 
 
37
    scenarios = all_repository_vf_format_scenarios()
 
38
 
 
39
    def test_attribute_inventories_store(self):
 
40
        """Test the existence of the inventories attribute."""
 
41
        tree = self.make_branch_and_tree('tree')
 
42
        repo = tree.branch.repository
 
43
        self.assertIsInstance(repo.inventories, versionedfile.VersionedFiles)
 
44
 
 
45
    def test_attribute_inventories_basics(self):
 
46
        """Test basic aspects of the inventories attribute."""
 
47
        tree = self.make_branch_and_tree('tree')
 
48
        repo = tree.branch.repository
 
49
        rev_id = (tree.commit('a'),)
 
50
        tree.lock_read()
 
51
        self.addCleanup(tree.unlock)
 
52
        self.assertEqual(set([rev_id]), set(repo.inventories.keys()))
 
53
 
 
54
    def test_attribute_revision_store(self):
 
55
        """Test the existence of the revisions attribute."""
 
56
        tree = self.make_branch_and_tree('tree')
 
57
        repo = tree.branch.repository
 
58
        self.assertIsInstance(repo.revisions,
 
59
            versionedfile.VersionedFiles)
 
60
 
 
61
    def test_attribute_revision_store_basics(self):
 
62
        """Test the basic behaviour of the revisions attribute."""
 
63
        tree = self.make_branch_and_tree('tree')
 
64
        repo = tree.branch.repository
 
65
        repo.lock_write()
 
66
        try:
 
67
            self.assertEqual(set(), set(repo.revisions.keys()))
 
68
            revid = (tree.commit("foo"),)
 
69
            self.assertEqual(set([revid]), set(repo.revisions.keys()))
 
70
            self.assertEqual({revid:()},
 
71
                repo.revisions.get_parent_map([revid]))
 
72
        finally:
 
73
            repo.unlock()
 
74
        tree2 = self.make_branch_and_tree('tree2')
 
75
        tree2.pull(tree.branch)
 
76
        left_id = (tree2.commit('left'),)
 
77
        right_id = (tree.commit('right'),)
 
78
        tree.merge_from_branch(tree2.branch)
 
79
        merge_id = (tree.commit('merged'),)
 
80
        repo.lock_read()
 
81
        self.addCleanup(repo.unlock)
 
82
        self.assertEqual(set([revid, left_id, right_id, merge_id]),
 
83
            set(repo.revisions.keys()))
 
84
        self.assertEqual({revid:(), left_id:(revid,), right_id:(revid,),
 
85
             merge_id:(right_id, left_id)},
 
86
            repo.revisions.get_parent_map(repo.revisions.keys()))
 
87
 
 
88
    def test_attribute_signature_store(self):
 
89
        """Test the existence of the signatures attribute."""
 
90
        tree = self.make_branch_and_tree('tree')
 
91
        repo = tree.branch.repository
 
92
        self.assertIsInstance(repo.signatures,
 
93
            versionedfile.VersionedFiles)
 
94
 
 
95
    def test_exposed_versioned_files_are_marked_dirty(self):
 
96
        repo = self.make_repository('.')
 
97
        repo.lock_write()
 
98
        signatures = repo.signatures
 
99
        revisions = repo.revisions
 
100
        inventories = repo.inventories
 
101
        repo.unlock()
 
102
        self.assertRaises(errors.ObjectNotLocked,
 
103
            signatures.keys)
 
104
        self.assertRaises(errors.ObjectNotLocked,
 
105
            revisions.keys)
 
106
        self.assertRaises(errors.ObjectNotLocked,
 
107
            inventories.keys)
 
108
        self.assertRaises(errors.ObjectNotLocked,
 
109
            signatures.add_lines, ('foo',), [], [])
 
110
        self.assertRaises(errors.ObjectNotLocked,
 
111
            revisions.add_lines, ('foo',), [], [])
 
112
        self.assertRaises(errors.ObjectNotLocked,
 
113
            inventories.add_lines, ('foo',), [], [])
 
114
 
 
115
 
 
116
class TestCaseWithComplexRepository(TestCaseWithRepository):
 
117
 
 
118
    scenarios = all_repository_vf_format_scenarios()
 
119
 
 
120
    def setUp(self):
 
121
        super(TestCaseWithComplexRepository, self).setUp()
 
122
        tree_a = self.make_branch_and_tree('a')
 
123
        self.bzrdir = tree_a.branch.bzrdir
 
124
        # add a corrupt inventory 'orphan'
 
125
        # this may need some generalising for knits.
 
126
        tree_a.lock_write()
 
127
        try:
 
128
            tree_a.branch.repository.start_write_group()
 
129
            try:
 
130
                inv_file = tree_a.branch.repository.inventories
 
131
                inv_file.add_lines(('orphan',), [], [])
 
132
            except:
 
133
                tree_a.branch.repository.commit_write_group()
 
134
                raise
 
135
            else:
 
136
                tree_a.branch.repository.abort_write_group()
 
137
        finally:
 
138
            tree_a.unlock()
 
139
        # add a real revision 'rev1'
 
140
        tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
 
141
        # add a real revision 'rev2' based on rev1
 
142
        tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
 
143
        # add a reference to a ghost
 
144
        tree_a.add_parent_tree_id('ghost1')
 
145
        try:
 
146
            tree_a.commit('rev3', rev_id='rev3', allow_pointless=True)
 
147
        except errors.RevisionNotPresent:
 
148
            raise tests.TestNotApplicable(
 
149
                "Cannot test with ghosts for this format.")
 
150
        # add another reference to a ghost, and a second ghost.
 
151
        tree_a.add_parent_tree_id('ghost1')
 
152
        tree_a.add_parent_tree_id('ghost2')
 
153
        tree_a.commit('rev4', rev_id='rev4', allow_pointless=True)
 
154
 
 
155
    def test_revision_trees(self):
 
156
        revision_ids = ['rev1', 'rev2', 'rev3', 'rev4']
 
157
        repository = self.bzrdir.open_repository()
 
158
        repository.lock_read()
 
159
        self.addCleanup(repository.unlock)
 
160
        trees1 = list(repository.revision_trees(revision_ids))
 
161
        trees2 = [repository.revision_tree(t) for t in revision_ids]
 
162
        self.assertEqual(len(trees1), len(trees2))
 
163
        for tree1, tree2 in zip(trees1, trees2):
 
164
            self.assertFalse(tree2.changes_from(tree1).has_changed())
 
165
 
 
166
    def test_get_deltas_for_revisions(self):
 
167
        repository = self.bzrdir.open_repository()
 
168
        repository.lock_read()
 
169
        self.addCleanup(repository.unlock)
 
170
        revisions = [repository.get_revision(r) for r in
 
171
                     ['rev1', 'rev2', 'rev3', 'rev4']]
 
172
        deltas1 = list(repository.get_deltas_for_revisions(revisions))
 
173
        deltas2 = [repository.get_revision_delta(r.revision_id) for r in
 
174
                   revisions]
 
175
        self.assertEqual(deltas1, deltas2)
 
176
 
 
177
    def test_all_revision_ids(self):
 
178
        # all_revision_ids -> all revisions
 
179
        self.assertEqual(set(['rev1', 'rev2', 'rev3', 'rev4']),
 
180
            set(self.bzrdir.open_repository().all_revision_ids()))
 
181
 
 
182
    def test_get_ancestry_missing_revision(self):
 
183
        # get_ancestry(revision that is in some data but not fully installed
 
184
        # -> NoSuchRevision
 
185
        self.assertRaises(errors.NoSuchRevision,
 
186
                          self.bzrdir.open_repository().get_ancestry, 'orphan')
 
187
 
 
188
    def test_get_unordered_ancestry(self):
 
189
        repo = self.bzrdir.open_repository()
 
190
        self.assertEqual(set(repo.get_ancestry('rev3')),
 
191
                         set(repo.get_ancestry('rev3', topo_sorted=False)))
 
192
 
 
193
    def test_reserved_id(self):
 
194
        repo = self.make_repository('repository')
 
195
        repo.lock_write()
 
196
        repo.start_write_group()
 
197
        try:
 
198
            self.assertRaises(errors.ReservedId, repo.add_inventory,
 
199
                'reserved:', None, None)
 
200
            self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
 
201
                "foo", [], 'reserved:', None)
 
202
            self.assertRaises(errors.ReservedId, repo.add_revision,
 
203
                'reserved:', None)
 
204
        finally:
 
205
            repo.abort_write_group()
 
206
            repo.unlock()