1
# Copyright (C) 2011 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for repository implementations - tests a repository format."""
25
from bzrlib.tests.per_repository_vf import (
26
TestCaseWithRepository,
27
all_repository_vf_format_scenarios,
29
from bzrlib.tests.scenarios import load_tests_apply_scenarios
32
load_tests = load_tests_apply_scenarios
35
class TestRepository(TestCaseWithRepository):
37
scenarios = all_repository_vf_format_scenarios()
39
def test_attribute_inventories_store(self):
40
"""Test the existence of the inventories attribute."""
41
tree = self.make_branch_and_tree('tree')
42
repo = tree.branch.repository
43
self.assertIsInstance(repo.inventories, versionedfile.VersionedFiles)
45
def test_attribute_inventories_basics(self):
46
"""Test basic aspects of the inventories attribute."""
47
tree = self.make_branch_and_tree('tree')
48
repo = tree.branch.repository
49
rev_id = (tree.commit('a'),)
51
self.addCleanup(tree.unlock)
52
self.assertEqual(set([rev_id]), set(repo.inventories.keys()))
54
def test_attribute_revision_store(self):
55
"""Test the existence of the revisions attribute."""
56
tree = self.make_branch_and_tree('tree')
57
repo = tree.branch.repository
58
self.assertIsInstance(repo.revisions,
59
versionedfile.VersionedFiles)
61
def test_attribute_revision_store_basics(self):
62
"""Test the basic behaviour of the revisions attribute."""
63
tree = self.make_branch_and_tree('tree')
64
repo = tree.branch.repository
67
self.assertEqual(set(), set(repo.revisions.keys()))
68
revid = (tree.commit("foo"),)
69
self.assertEqual(set([revid]), set(repo.revisions.keys()))
70
self.assertEqual({revid:()},
71
repo.revisions.get_parent_map([revid]))
74
tree2 = self.make_branch_and_tree('tree2')
75
tree2.pull(tree.branch)
76
left_id = (tree2.commit('left'),)
77
right_id = (tree.commit('right'),)
78
tree.merge_from_branch(tree2.branch)
79
merge_id = (tree.commit('merged'),)
81
self.addCleanup(repo.unlock)
82
self.assertEqual(set([revid, left_id, right_id, merge_id]),
83
set(repo.revisions.keys()))
84
self.assertEqual({revid:(), left_id:(revid,), right_id:(revid,),
85
merge_id:(right_id, left_id)},
86
repo.revisions.get_parent_map(repo.revisions.keys()))
88
def test_attribute_signature_store(self):
89
"""Test the existence of the signatures attribute."""
90
tree = self.make_branch_and_tree('tree')
91
repo = tree.branch.repository
92
self.assertIsInstance(repo.signatures,
93
versionedfile.VersionedFiles)
95
def test_exposed_versioned_files_are_marked_dirty(self):
96
repo = self.make_repository('.')
98
signatures = repo.signatures
99
revisions = repo.revisions
100
inventories = repo.inventories
102
self.assertRaises(errors.ObjectNotLocked,
104
self.assertRaises(errors.ObjectNotLocked,
106
self.assertRaises(errors.ObjectNotLocked,
108
self.assertRaises(errors.ObjectNotLocked,
109
signatures.add_lines, ('foo',), [], [])
110
self.assertRaises(errors.ObjectNotLocked,
111
revisions.add_lines, ('foo',), [], [])
112
self.assertRaises(errors.ObjectNotLocked,
113
inventories.add_lines, ('foo',), [], [])
116
class TestCaseWithComplexRepository(TestCaseWithRepository):
118
scenarios = all_repository_vf_format_scenarios()
121
super(TestCaseWithComplexRepository, self).setUp()
122
tree_a = self.make_branch_and_tree('a')
123
self.bzrdir = tree_a.branch.bzrdir
124
# add a corrupt inventory 'orphan'
125
# this may need some generalising for knits.
128
tree_a.branch.repository.start_write_group()
130
inv_file = tree_a.branch.repository.inventories
131
inv_file.add_lines(('orphan',), [], [])
133
tree_a.branch.repository.commit_write_group()
136
tree_a.branch.repository.abort_write_group()
139
# add a real revision 'rev1'
140
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
141
# add a real revision 'rev2' based on rev1
142
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
143
# add a reference to a ghost
144
tree_a.add_parent_tree_id('ghost1')
146
tree_a.commit('rev3', rev_id='rev3', allow_pointless=True)
147
except errors.RevisionNotPresent:
148
raise tests.TestNotApplicable(
149
"Cannot test with ghosts for this format.")
150
# add another reference to a ghost, and a second ghost.
151
tree_a.add_parent_tree_id('ghost1')
152
tree_a.add_parent_tree_id('ghost2')
153
tree_a.commit('rev4', rev_id='rev4', allow_pointless=True)
155
def test_revision_trees(self):
156
revision_ids = ['rev1', 'rev2', 'rev3', 'rev4']
157
repository = self.bzrdir.open_repository()
158
repository.lock_read()
159
self.addCleanup(repository.unlock)
160
trees1 = list(repository.revision_trees(revision_ids))
161
trees2 = [repository.revision_tree(t) for t in revision_ids]
162
self.assertEqual(len(trees1), len(trees2))
163
for tree1, tree2 in zip(trees1, trees2):
164
self.assertFalse(tree2.changes_from(tree1).has_changed())
166
def test_get_deltas_for_revisions(self):
167
repository = self.bzrdir.open_repository()
168
repository.lock_read()
169
self.addCleanup(repository.unlock)
170
revisions = [repository.get_revision(r) for r in
171
['rev1', 'rev2', 'rev3', 'rev4']]
172
deltas1 = list(repository.get_deltas_for_revisions(revisions))
173
deltas2 = [repository.get_revision_delta(r.revision_id) for r in
175
self.assertEqual(deltas1, deltas2)
177
def test_all_revision_ids(self):
178
# all_revision_ids -> all revisions
179
self.assertEqual(set(['rev1', 'rev2', 'rev3', 'rev4']),
180
set(self.bzrdir.open_repository().all_revision_ids()))
182
def test_get_ancestry_missing_revision(self):
183
# get_ancestry(revision that is in some data but not fully installed
185
self.assertRaises(errors.NoSuchRevision,
186
self.bzrdir.open_repository().get_ancestry, 'orphan')
188
def test_get_unordered_ancestry(self):
189
repo = self.bzrdir.open_repository()
190
self.assertEqual(set(repo.get_ancestry('rev3')),
191
set(repo.get_ancestry('rev3', topo_sorted=False)))
193
def test_reserved_id(self):
194
repo = self.make_repository('repository')
196
repo.start_write_group()
198
self.assertRaises(errors.ReservedId, repo.add_inventory,
199
'reserved:', None, None)
200
self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
201
"foo", [], 'reserved:', None)
202
self.assertRaises(errors.ReservedId, repo.add_revision,
205
repo.abort_write_group()