91
87
def test_attribute_format_pack_compresses(self):
92
88
self.assertFormatAttribute('pack_compresses', (True, False))
94
def test_attribute_inventories_store(self):
95
"""Test the existence of the inventories attribute."""
96
tree = self.make_branch_and_tree('tree')
97
repo = tree.branch.repository
98
self.assertIsInstance(repo.inventories, versionedfile.VersionedFiles)
100
def test_attribute_inventories_basics(self):
101
"""Test basic aspects of the inventories attribute."""
102
tree = self.make_branch_and_tree('tree')
103
repo = tree.branch.repository
104
rev_id = (tree.commit('a'),)
106
self.addCleanup(tree.unlock)
107
self.assertEqual(set([rev_id]), set(repo.inventories.keys()))
109
def test_attribute_revision_store(self):
110
"""Test the existence of the revisions attribute."""
111
tree = self.make_branch_and_tree('tree')
112
repo = tree.branch.repository
113
self.assertIsInstance(repo.revisions,
114
versionedfile.VersionedFiles)
116
def test_attribute_revision_store_basics(self):
117
"""Test the basic behaviour of the revisions attribute."""
118
tree = self.make_branch_and_tree('tree')
119
repo = tree.branch.repository
122
self.assertEqual(set(), set(repo.revisions.keys()))
123
revid = (tree.commit("foo"),)
124
self.assertEqual(set([revid]), set(repo.revisions.keys()))
125
self.assertEqual({revid:()},
126
repo.revisions.get_parent_map([revid]))
129
tree2 = self.make_branch_and_tree('tree2')
130
tree2.pull(tree.branch)
131
left_id = (tree2.commit('left'),)
132
right_id = (tree.commit('right'),)
133
tree.merge_from_branch(tree2.branch)
134
merge_id = (tree.commit('merged'),)
136
self.addCleanup(repo.unlock)
137
self.assertEqual(set([revid, left_id, right_id, merge_id]),
138
set(repo.revisions.keys()))
139
self.assertEqual({revid:(), left_id:(revid,), right_id:(revid,),
140
merge_id:(right_id, left_id)},
141
repo.revisions.get_parent_map(repo.revisions.keys()))
143
def test_attribute_signature_store(self):
144
"""Test the existence of the signatures attribute."""
145
tree = self.make_branch_and_tree('tree')
146
repo = tree.branch.repository
147
self.assertIsInstance(repo.signatures,
148
versionedfile.VersionedFiles)
90
def test_attribute_format_supports_full_versioned_files(self):
91
self.assertFormatAttribute('supports_full_versioned_files',
94
def test_attribute_format_supports_funky_characters(self):
95
self.assertFormatAttribute('supports_funky_characters',
98
def test_attribute_format_supports_leaving_lock(self):
99
self.assertFormatAttribute('supports_leaving_lock',
102
def test_format_is_deprecated(self):
103
repo = self.make_repository('repo')
104
self.assertSubset([repo._format.is_deprecated()], (True, False))
106
def test_format_is_supported(self):
107
repo = self.make_repository('repo')
108
self.assertSubset([repo._format.is_supported()], (True, False))
150
110
def test_attribute_text_store_basics(self):
151
111
"""Test the basic behaviour of the text store."""
210
170
self.assertIsInstance(repo.texts,
211
171
versionedfile.VersionedFiles)
213
def test_exposed_versioned_files_are_marked_dirty(self):
214
repo = self.make_repository('.')
216
signatures = repo.signatures
217
revisions = repo.revisions
218
inventories = repo.inventories
220
self.assertRaises(errors.ObjectNotLocked,
222
self.assertRaises(errors.ObjectNotLocked,
224
self.assertRaises(errors.ObjectNotLocked,
226
self.assertRaises(errors.ObjectNotLocked,
227
signatures.add_lines, ('foo',), [], [])
228
self.assertRaises(errors.ObjectNotLocked,
229
revisions.add_lines, ('foo',), [], [])
230
self.assertRaises(errors.ObjectNotLocked,
231
inventories.add_lines, ('foo',), [], [])
233
173
def test_clone_to_default_format(self):
234
174
#TODO: Test that cloning a repository preserves all the information
235
175
# such as signatures[not tested yet] etc etc.
599
539
self.addCleanup(rev_tree.unlock)
600
540
self.assertEqual('rev_id', rev_tree.inventory.root.revision)
602
def test_upgrade_from_format4(self):
603
from bzrlib.tests.test_upgrade import _upgrade_dir_template
604
if isinstance(self.repository_format, remote.RemoteRepositoryFormat):
605
return # local conversion to/from RemoteObjects is irrelevant.
606
if self.repository_format.get_format_description() \
607
== "Repository format 4":
608
raise tests.TestSkipped('Cannot convert format-4 to itself')
609
self.build_tree_contents(_upgrade_dir_template)
610
old_repodir = bzrdir.BzrDir.open_unsupported('.')
611
old_repo_format = old_repodir.open_repository()._format
612
format = self.repository_format._matchingbzrdir
614
format.repository_format = self.repository_format
615
except AttributeError:
617
upgrade.upgrade('.', format)
619
542
def test_pointless_commit(self):
620
543
tree = self.make_branch_and_tree('.')
621
544
self.assertRaises(errors.PointlessCommit, tree.commit, 'pointless',
871
791
def test_sprout_branch_from_hpss_preserves_repo_format(self):
872
792
"""branch.sprout from a smart server preserves the repository format.
874
weave_formats = [weaverepo.RepositoryFormat5(),
875
weaverepo.RepositoryFormat6(),
876
weaverepo.RepositoryFormat7()]
877
if self.repository_format in weave_formats:
794
if not self.repository_format.supports_leaving_lock:
878
795
raise tests.TestNotApplicable(
879
"Cannot fetch weaves over smart protocol.")
796
"Format can not be used over HPSS")
880
797
remote_repo = self.make_remote_repository('remote')
881
798
remote_branch = remote_repo.bzrdir.create_branch()
893
810
"""branch.sprout from a smart server preserves the repository format of
894
811
a branch from a shared repository.
896
weave_formats = [weaverepo.RepositoryFormat5(),
897
weaverepo.RepositoryFormat6(),
898
weaverepo.RepositoryFormat7()]
899
if self.repository_format in weave_formats:
813
if not self.repository_format.supports_leaving_lock:
900
814
raise tests.TestNotApplicable(
901
"Cannot fetch weaves over smart protocol.")
815
"Format can not be used over HPSS")
902
816
# Make a shared repo
903
817
remote_repo = self.make_remote_repository('remote', shared=True)
904
818
remote_backing_repo = bzrdir.BzrDir.open(
1151
1063
self.assertThat(repo.lock_write, ReturnsUnlockable(repo))
1154
class TestCaseWithComplexRepository(per_repository.TestCaseWithRepository):
1157
super(TestCaseWithComplexRepository, self).setUp()
1158
tree_a = self.make_branch_and_tree('a')
1159
self.bzrdir = tree_a.branch.bzrdir
1160
# add a corrupt inventory 'orphan'
1161
# this may need some generalising for knits.
1164
tree_a.branch.repository.start_write_group()
1166
inv_file = tree_a.branch.repository.inventories
1167
inv_file.add_lines(('orphan',), [], [])
1169
tree_a.branch.repository.commit_write_group()
1172
tree_a.branch.repository.abort_write_group()
1175
# add a real revision 'rev1'
1176
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
1177
# add a real revision 'rev2' based on rev1
1178
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
1179
# add a reference to a ghost
1180
tree_a.add_parent_tree_id('ghost1')
1182
tree_a.commit('rev3', rev_id='rev3', allow_pointless=True)
1183
except errors.RevisionNotPresent:
1184
raise tests.TestNotApplicable(
1185
"Cannot test with ghosts for this format.")
1186
# add another reference to a ghost, and a second ghost.
1187
tree_a.add_parent_tree_id('ghost1')
1188
tree_a.add_parent_tree_id('ghost2')
1189
tree_a.commit('rev4', rev_id='rev4', allow_pointless=True)
1191
def test_revision_trees(self):
1192
revision_ids = ['rev1', 'rev2', 'rev3', 'rev4']
1193
repository = self.bzrdir.open_repository()
1194
repository.lock_read()
1195
self.addCleanup(repository.unlock)
1196
trees1 = list(repository.revision_trees(revision_ids))
1197
trees2 = [repository.revision_tree(t) for t in revision_ids]
1198
self.assertEqual(len(trees1), len(trees2))
1199
for tree1, tree2 in zip(trees1, trees2):
1200
self.assertFalse(tree2.changes_from(tree1).has_changed())
1202
def test_get_deltas_for_revisions(self):
1203
repository = self.bzrdir.open_repository()
1204
repository.lock_read()
1205
self.addCleanup(repository.unlock)
1206
revisions = [repository.get_revision(r) for r in
1207
['rev1', 'rev2', 'rev3', 'rev4']]
1208
deltas1 = list(repository.get_deltas_for_revisions(revisions))
1209
deltas2 = [repository.get_revision_delta(r.revision_id) for r in
1211
self.assertEqual(deltas1, deltas2)
1213
def test_all_revision_ids(self):
1214
# all_revision_ids -> all revisions
1215
self.assertEqual(set(['rev1', 'rev2', 'rev3', 'rev4']),
1216
set(self.bzrdir.open_repository().all_revision_ids()))
1218
def test_get_ancestry_missing_revision(self):
1219
# get_ancestry(revision that is in some data but not fully installed
1221
self.assertRaises(errors.NoSuchRevision,
1222
self.bzrdir.open_repository().get_ancestry, 'orphan')
1224
def test_get_unordered_ancestry(self):
1225
repo = self.bzrdir.open_repository()
1226
self.assertEqual(set(repo.get_ancestry('rev3')),
1227
set(repo.get_ancestry('rev3', topo_sorted=False)))
1229
def test_reserved_id(self):
1230
repo = self.make_repository('repository')
1232
repo.start_write_group()
1234
self.assertRaises(errors.ReservedId, repo.add_inventory, 'reserved:',
1236
self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
1237
"foo", [], 'reserved:', None)
1238
self.assertRaises(errors.ReservedId, repo.add_revision, 'reserved:',
1241
repo.abort_write_group()
1245
1066
class TestCaseWithCorruptRepository(per_repository.TestCaseWithRepository):
1247
1068
def setUp(self):