83
88
def test_attribute_format_pack_compresses(self):
84
89
self.assertFormatAttribute('pack_compresses', (True, False))
86
def test_attribute_format_supports_full_versioned_files(self):
87
self.assertFormatAttribute('supports_full_versioned_files',
90
def test_attribute_format_supports_funky_characters(self):
91
self.assertFormatAttribute('supports_funky_characters',
94
def test_attribute_format_supports_leaving_lock(self):
95
self.assertFormatAttribute('supports_leaving_lock',
98
def test_attribute_format_versioned_directories(self):
99
self.assertFormatAttribute('supports_versioned_directories', (True, False))
101
def test_attribute_format_revision_graph_can_have_wrong_parents(self):
102
self.assertFormatAttribute('revision_graph_can_have_wrong_parents',
105
def test_format_is_deprecated(self):
106
repo = self.make_repository('repo')
107
self.assertSubset([repo._format.is_deprecated()], (True, False))
109
def test_format_is_supported(self):
110
repo = self.make_repository('repo')
111
self.assertSubset([repo._format.is_supported()], (True, False))
91
def test_attribute_inventories_store(self):
92
"""Test the existence of the inventories attribute."""
93
tree = self.make_branch_and_tree('tree')
94
repo = tree.branch.repository
95
self.assertIsInstance(repo.inventories, versionedfile.VersionedFiles)
97
def test_attribute_inventories_basics(self):
98
"""Test basic aspects of the inventories attribute."""
99
tree = self.make_branch_and_tree('tree')
100
repo = tree.branch.repository
101
rev_id = (tree.commit('a'),)
103
self.addCleanup(tree.unlock)
104
self.assertEqual(set([rev_id]), set(repo.inventories.keys()))
106
def test_attribute_revision_store(self):
107
"""Test the existence of the revisions attribute."""
108
tree = self.make_branch_and_tree('tree')
109
repo = tree.branch.repository
110
self.assertIsInstance(repo.revisions,
111
versionedfile.VersionedFiles)
113
def test_attribute_revision_store_basics(self):
114
"""Test the basic behaviour of the revisions attribute."""
115
tree = self.make_branch_and_tree('tree')
116
repo = tree.branch.repository
119
self.assertEqual(set(), set(repo.revisions.keys()))
120
revid = (tree.commit("foo"),)
121
self.assertEqual(set([revid]), set(repo.revisions.keys()))
122
self.assertEqual({revid:()},
123
repo.revisions.get_parent_map([revid]))
126
tree2 = self.make_branch_and_tree('tree2')
127
tree2.pull(tree.branch)
128
left_id = (tree2.commit('left'),)
129
right_id = (tree.commit('right'),)
130
tree.merge_from_branch(tree2.branch)
131
merge_id = (tree.commit('merged'),)
133
self.addCleanup(repo.unlock)
134
self.assertEqual(set([revid, left_id, right_id, merge_id]),
135
set(repo.revisions.keys()))
136
self.assertEqual({revid:(), left_id:(revid,), right_id:(revid,),
137
merge_id:(right_id, left_id)},
138
repo.revisions.get_parent_map(repo.revisions.keys()))
140
def test_attribute_signature_store(self):
141
"""Test the existence of the signatures attribute."""
142
tree = self.make_branch_and_tree('tree')
143
repo = tree.branch.repository
144
self.assertIsInstance(repo.signatures,
145
versionedfile.VersionedFiles)
147
def test_attribute_text_store_basics(self):
148
"""Test the basic behaviour of the text store."""
149
tree = self.make_branch_and_tree('tree')
150
repo = tree.branch.repository
152
file_key = (file_id,)
155
self.assertEqual(set(), set(repo.texts.keys()))
156
tree.add(['foo'], [file_id], ['file'])
157
tree.put_file_bytes_non_atomic(file_id, 'content\n')
159
rev_key = (tree.commit("foo"),)
160
except errors.IllegalPath:
161
raise tests.TestNotApplicable(
162
'file_id %r cannot be stored on this'
163
' platform for this repo format' % (file_id,))
164
if repo._format.rich_root_data:
165
root_commit = (tree.get_root_id(),) + rev_key
166
keys = set([root_commit])
167
parents = {root_commit:()}
171
keys.add(file_key + rev_key)
172
parents[file_key + rev_key] = ()
173
self.assertEqual(keys, set(repo.texts.keys()))
174
self.assertEqual(parents,
175
repo.texts.get_parent_map(repo.texts.keys()))
178
tree2 = self.make_branch_and_tree('tree2')
179
tree2.pull(tree.branch)
180
tree2.put_file_bytes_non_atomic('Foo:Bar', 'right\n')
181
right_key = (tree2.commit('right'),)
182
keys.add(file_key + right_key)
183
parents[file_key + right_key] = (file_key + rev_key,)
184
tree.put_file_bytes_non_atomic('Foo:Bar', 'left\n')
185
left_key = (tree.commit('left'),)
186
keys.add(file_key + left_key)
187
parents[file_key + left_key] = (file_key + rev_key,)
188
tree.merge_from_branch(tree2.branch)
189
tree.put_file_bytes_non_atomic('Foo:Bar', 'merged\n')
192
except errors.UnsupportedOperation:
194
merge_key = (tree.commit('merged'),)
195
keys.add(file_key + merge_key)
196
parents[file_key + merge_key] = (file_key + left_key,
197
file_key + right_key)
199
self.addCleanup(repo.unlock)
200
self.assertEqual(keys, set(repo.texts.keys()))
201
self.assertEqual(parents, repo.texts.get_parent_map(repo.texts.keys()))
203
def test_attribute_text_store(self):
204
"""Test the existence of the texts attribute."""
205
tree = self.make_branch_and_tree('tree')
206
repo = tree.branch.repository
207
self.assertIsInstance(repo.texts,
208
versionedfile.VersionedFiles)
210
def test_exposed_versioned_files_are_marked_dirty(self):
211
repo = self.make_repository('.')
213
signatures = repo.signatures
214
revisions = repo.revisions
215
inventories = repo.inventories
217
self.assertRaises(errors.ObjectNotLocked,
219
self.assertRaises(errors.ObjectNotLocked,
221
self.assertRaises(errors.ObjectNotLocked,
223
self.assertRaises(errors.ObjectNotLocked,
224
signatures.add_lines, ('foo',), [], [])
225
self.assertRaises(errors.ObjectNotLocked,
226
revisions.add_lines, ('foo',), [], [])
227
self.assertRaises(errors.ObjectNotLocked,
228
inventories.add_lines, ('foo',), [], [])
113
230
def test_clone_to_default_format(self):
114
231
#TODO: Test that cloning a repository preserves all the information
514
659
repository.iter_files_bytes(
515
660
[('file3-id', 'rev3', 'file1-notpresent')]))
662
def test_item_keys_introduced_by(self):
663
# Make a repo with one revision and one versioned file.
664
tree = self.make_branch_and_tree('t')
665
self.build_tree(['t/foo'])
666
tree.add('foo', 'file1')
667
tree.commit('message', rev_id='rev_id')
668
repo = tree.branch.repository
670
repo.start_write_group()
671
repo.sign_revision('rev_id', gpg.LoopbackGPGStrategy(None))
672
repo.commit_write_group()
675
self.addCleanup(repo.unlock)
677
# Item keys will be in this order, for maximum convenience for
678
# generating data to insert into knit repository:
683
expected_item_keys = [
684
('file', 'file1', ['rev_id']),
685
('inventory', None, ['rev_id']),
686
('signatures', None, ['rev_id']),
687
('revisions', None, ['rev_id'])]
688
item_keys = list(repo.item_keys_introduced_by(['rev_id']))
690
(kind, file_id, list(versions))
691
for (kind, file_id, versions) in item_keys]
693
if repo.supports_rich_root():
694
# Check for the root versioned file in the item_keys, then remove
695
# it from streamed_names so we can compare that with
696
# expected_record_names.
697
# Note that the file keys can be in any order, so this test is
698
# written to allow that.
699
inv = repo.get_inventory('rev_id')
700
root_item_key = ('file', inv.root.file_id, ['rev_id'])
701
self.assertTrue(root_item_key in item_keys)
702
item_keys.remove(root_item_key)
704
self.assertEqual(expected_item_keys, item_keys)
517
706
def test_get_graph(self):
518
707
"""Bare-bones smoketest that all repositories implement get_graph."""
519
708
repo = self.make_repository('repo')
583
772
repo._check_for_inconsistent_revision_parents()
585
774
def test_add_signature_text(self):
586
builder = self.make_branch_builder('.')
587
builder.start_series()
588
builder.build_snapshot('A', None, [
589
('add', ('', 'root-id', 'directory', None))])
590
builder.finish_series()
591
b = builder.get_branch()
593
self.addCleanup(b.unlock)
594
b.repository.start_write_group()
595
self.addCleanup(b.repository.abort_write_group)
596
if b.repository._format.supports_revision_signatures:
597
b.repository.add_signature_text('A', 'This might be a signature')
598
self.assertEqual('This might be a signature',
599
b.repository.get_signature_text('A'))
601
self.assertRaises(errors.UnsupportedOperation,
602
b.repository.add_signature_text, 'A',
603
'This might be a signature')
775
repo = self.make_repository('repo')
777
self.addCleanup(repo.unlock)
778
repo.start_write_group()
779
self.addCleanup(repo.abort_write_group)
780
inv = inventory.Inventory(revision_id='A')
781
inv.root.revision = 'A'
782
repo.add_inventory('A', inv, [])
783
repo.add_revision('A', _mod_revision.Revision(
784
'A', committer='A', timestamp=0,
785
inventory_sha1='', timezone=0, message='A'))
786
repo.add_signature_text('A', 'This might be a signature')
787
self.assertEqual('This might be a signature',
788
repo.get_signature_text('A'))
790
def test_add_revision_inventory_sha1(self):
791
inv = inventory.Inventory(revision_id='A')
792
inv.root.revision = 'A'
793
inv.root.file_id = 'fixed-root'
794
# Insert the inventory on its own to an identical repository, to get
796
reference_repo = self.make_repository('reference_repo')
797
reference_repo.lock_write()
798
reference_repo.start_write_group()
799
inv_sha1 = reference_repo.add_inventory('A', inv, [])
800
reference_repo.abort_write_group()
801
reference_repo.unlock()
802
# Now insert a revision with this inventory, and it should get the same
804
repo = self.make_repository('repo')
806
repo.start_write_group()
807
root_id = inv.root.file_id
808
repo.texts.add_lines(('fixed-root', 'A'), [], [])
809
repo.add_revision('A', _mod_revision.Revision(
810
'A', committer='B', timestamp=0,
811
timezone=0, message='C'), inv=inv)
812
repo.commit_write_group()
815
self.assertEquals(inv_sha1, repo.get_revision('A').inventory_sha1)
818
def test_install_revisions(self):
819
wt = self.make_branch_and_tree('source')
820
wt.commit('A', allow_pointless=True, rev_id='A')
821
repo = wt.branch.repository
823
repo.start_write_group()
824
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
825
repo.commit_write_group()
828
self.addCleanup(repo.unlock)
829
repo2 = self.make_repository('repo2')
830
revision = repo.get_revision('A')
831
tree = repo.revision_tree('A')
832
signature = repo.get_signature_text('A')
834
self.addCleanup(repo2.unlock)
835
repository.install_revisions(repo2, [(revision, tree, signature)])
836
self.assertEqual(revision, repo2.get_revision('A'))
837
self.assertEqual(signature, repo2.get_signature_text('A'))
605
839
# XXX: this helper duplicated from tests.test_repository
606
840
def make_remote_repository(self, path, shared=False):
903
1137
self.assertThat(repo.lock_write, ReturnsUnlockable(repo))
1140
class TestCaseWithComplexRepository(per_repository.TestCaseWithRepository):
1143
super(TestCaseWithComplexRepository, self).setUp()
1144
tree_a = self.make_branch_and_tree('a')
1145
self.bzrdir = tree_a.branch.bzrdir
1146
# add a corrupt inventory 'orphan'
1147
# this may need some generalising for knits.
1150
tree_a.branch.repository.start_write_group()
1152
inv_file = tree_a.branch.repository.inventories
1153
inv_file.add_lines(('orphan',), [], [])
1155
tree_a.branch.repository.commit_write_group()
1158
tree_a.branch.repository.abort_write_group()
1161
# add a real revision 'rev1'
1162
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
1163
# add a real revision 'rev2' based on rev1
1164
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
1165
# add a reference to a ghost
1166
tree_a.add_parent_tree_id('ghost1')
1168
tree_a.commit('rev3', rev_id='rev3', allow_pointless=True)
1169
except errors.RevisionNotPresent:
1170
raise tests.TestNotApplicable(
1171
"Cannot test with ghosts for this format.")
1172
# add another reference to a ghost, and a second ghost.
1173
tree_a.add_parent_tree_id('ghost1')
1174
tree_a.add_parent_tree_id('ghost2')
1175
tree_a.commit('rev4', rev_id='rev4', allow_pointless=True)
1177
def test_revision_trees(self):
1178
revision_ids = ['rev1', 'rev2', 'rev3', 'rev4']
1179
repository = self.bzrdir.open_repository()
1180
repository.lock_read()
1181
self.addCleanup(repository.unlock)
1182
trees1 = list(repository.revision_trees(revision_ids))
1183
trees2 = [repository.revision_tree(t) for t in revision_ids]
1184
self.assertEqual(len(trees1), len(trees2))
1185
for tree1, tree2 in zip(trees1, trees2):
1186
self.assertFalse(tree2.changes_from(tree1).has_changed())
1188
def test_get_deltas_for_revisions(self):
1189
repository = self.bzrdir.open_repository()
1190
repository.lock_read()
1191
self.addCleanup(repository.unlock)
1192
revisions = [repository.get_revision(r) for r in
1193
['rev1', 'rev2', 'rev3', 'rev4']]
1194
deltas1 = list(repository.get_deltas_for_revisions(revisions))
1195
deltas2 = [repository.get_revision_delta(r.revision_id) for r in
1197
self.assertEqual(deltas1, deltas2)
1199
def test_all_revision_ids(self):
1200
# all_revision_ids -> all revisions
1201
self.assertEqual(set(['rev1', 'rev2', 'rev3', 'rev4']),
1202
set(self.bzrdir.open_repository().all_revision_ids()))
1204
def test_get_ancestry_missing_revision(self):
1205
# get_ancestry(revision that is in some data but not fully installed
1207
self.assertRaises(errors.NoSuchRevision,
1208
self.bzrdir.open_repository().get_ancestry, 'orphan')
1210
def test_get_unordered_ancestry(self):
1211
repo = self.bzrdir.open_repository()
1212
self.assertEqual(set(repo.get_ancestry('rev3')),
1213
set(repo.get_ancestry('rev3', topo_sorted=False)))
1215
def test_reserved_id(self):
1216
repo = self.make_repository('repository')
1218
repo.start_write_group()
1220
self.assertRaises(errors.ReservedId, repo.add_inventory, 'reserved:',
1222
self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
1223
"foo", [], 'reserved:', None)
1224
self.assertRaises(errors.ReservedId, repo.add_revision, 'reserved:',
1227
repo.abort_write_group()
1231
class TestCaseWithCorruptRepository(per_repository.TestCaseWithRepository):
1234
super(TestCaseWithCorruptRepository, self).setUp()
1235
# a inventory with no parents and the revision has parents..
1237
repo = self.make_repository('inventory_with_unnecessary_ghost')
1239
repo.start_write_group()
1240
inv = inventory.Inventory(revision_id = 'ghost')
1241
inv.root.revision = 'ghost'
1242
if repo.supports_rich_root():
1243
root_id = inv.root.file_id
1244
repo.texts.add_lines((root_id, 'ghost'), [], [])
1245
sha1 = repo.add_inventory('ghost', inv, [])
1246
rev = _mod_revision.Revision(
1247
timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
1248
message="Message", inventory_sha1=sha1, revision_id='ghost')
1249
rev.parent_ids = ['the_ghost']
1251
repo.add_revision('ghost', rev)
1252
except (errors.NoSuchRevision, errors.RevisionNotPresent):
1253
raise tests.TestNotApplicable(
1254
"Cannot test with ghosts for this format.")
1256
inv = inventory.Inventory(revision_id = 'the_ghost')
1257
inv.root.revision = 'the_ghost'
1258
if repo.supports_rich_root():
1259
root_id = inv.root.file_id
1260
repo.texts.add_lines((root_id, 'the_ghost'), [], [])
1261
sha1 = repo.add_inventory('the_ghost', inv, [])
1262
rev = _mod_revision.Revision(
1263
timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
1264
message="Message", inventory_sha1=sha1, revision_id='the_ghost')
1266
repo.add_revision('the_ghost', rev)
1267
# check its setup usefully
1268
inv_weave = repo.inventories
1269
possible_parents = (None, (('ghost',),))
1270
self.assertSubset(inv_weave.get_parent_map([('ghost',)])[('ghost',)],
1272
repo.commit_write_group()
1275
def test_corrupt_revision_access_asserts_if_reported_wrong(self):
1276
repo_url = self.get_url('inventory_with_unnecessary_ghost')
1277
repo = repository.Repository.open(repo_url)
1278
reported_wrong = False
1280
if repo.get_ancestry('ghost') != [None, 'the_ghost', 'ghost']:
1281
reported_wrong = True
1282
except errors.CorruptRepository:
1283
# caught the bad data:
1285
if not reported_wrong:
1287
self.assertRaises(errors.CorruptRepository, repo.get_revision, 'ghost')
1289
def test_corrupt_revision_get_revision_reconcile(self):
1290
repo_url = self.get_url('inventory_with_unnecessary_ghost')
1291
repo = repository.Repository.open(repo_url)
1292
repo.get_revision_reconcile('ghost')
906
1295
# FIXME: document why this is a TestCaseWithTransport rather than a
907
1296
# TestCaseWithRepository
908
1297
class TestEscaping(tests.TestCaseWithTransport):