1
# Copyright (C) 2011, 2012, 2016 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for repository implementations - tests a repository format."""
23
repository as _mod_repository,
24
revision as _mod_revision,
30
from bzrlib.symbol_versioning import deprecated_in
31
from bzrlib.tests.matchers import MatchesAncestry
32
from bzrlib.tests.per_repository_vf import (
33
TestCaseWithRepository,
34
all_repository_vf_format_scenarios,
36
from bzrlib.tests.scenarios import load_tests_apply_scenarios
39
load_tests = load_tests_apply_scenarios
42
class TestRepository(TestCaseWithRepository):
44
scenarios = all_repository_vf_format_scenarios()
46
def assertFormatAttribute(self, attribute, allowed_values):
47
"""Assert that the format has an attribute 'attribute'."""
48
repo = self.make_repository('repo')
49
self.assertSubset([getattr(repo._format, attribute)], allowed_values)
51
def test_attribute__fetch_order(self):
52
"""Test the _fetch_order attribute."""
53
self.assertFormatAttribute('_fetch_order', ('topological', 'unordered'))
55
def test_attribute__fetch_uses_deltas(self):
56
"""Test the _fetch_uses_deltas attribute."""
57
self.assertFormatAttribute('_fetch_uses_deltas', (True, False))
59
def test_attribute_inventories_store(self):
60
"""Test the existence of the inventories attribute."""
61
tree = self.make_branch_and_tree('tree')
62
repo = tree.branch.repository
63
self.assertIsInstance(repo.inventories, versionedfile.VersionedFiles)
65
def test_attribute_inventories_basics(self):
66
"""Test basic aspects of the inventories attribute."""
67
tree = self.make_branch_and_tree('tree')
68
repo = tree.branch.repository
69
rev_id = (tree.commit('a'),)
71
self.addCleanup(tree.unlock)
72
self.assertEqual(set([rev_id]), set(repo.inventories.keys()))
74
def test_attribute_revision_store(self):
75
"""Test the existence of the revisions attribute."""
76
tree = self.make_branch_and_tree('tree')
77
repo = tree.branch.repository
78
self.assertIsInstance(repo.revisions,
79
versionedfile.VersionedFiles)
81
def test_attribute_revision_store_basics(self):
82
"""Test the basic behaviour of the revisions attribute."""
83
tree = self.make_branch_and_tree('tree')
84
repo = tree.branch.repository
87
self.assertEqual(set(), set(repo.revisions.keys()))
88
revid = (tree.commit("foo"),)
89
self.assertEqual(set([revid]), set(repo.revisions.keys()))
90
self.assertEqual({revid:()},
91
repo.revisions.get_parent_map([revid]))
94
tree2 = self.make_branch_and_tree('tree2')
95
tree2.pull(tree.branch)
96
left_id = (tree2.commit('left'),)
97
right_id = (tree.commit('right'),)
98
tree.merge_from_branch(tree2.branch)
99
merge_id = (tree.commit('merged'),)
101
self.addCleanup(repo.unlock)
102
self.assertEqual(set([revid, left_id, right_id, merge_id]),
103
set(repo.revisions.keys()))
104
self.assertEqual({revid:(), left_id:(revid,), right_id:(revid,),
105
merge_id:(right_id, left_id)},
106
repo.revisions.get_parent_map(repo.revisions.keys()))
108
def test_attribute_signature_store(self):
109
"""Test the existence of the signatures attribute."""
110
tree = self.make_branch_and_tree('tree')
111
repo = tree.branch.repository
112
self.assertIsInstance(repo.signatures,
113
versionedfile.VersionedFiles)
115
def test_exposed_versioned_files_are_marked_dirty(self):
116
repo = self.make_repository('.')
118
signatures = repo.signatures
119
revisions = repo.revisions
120
inventories = repo.inventories
122
self.assertRaises(errors.ObjectNotLocked,
124
self.assertRaises(errors.ObjectNotLocked,
126
self.assertRaises(errors.ObjectNotLocked,
128
self.assertRaises(errors.ObjectNotLocked,
129
signatures.add_lines, ('foo',), [], [])
130
self.assertRaises(errors.ObjectNotLocked,
131
revisions.add_lines, ('foo',), [], [])
132
self.assertRaises(errors.ObjectNotLocked,
133
inventories.add_lines, ('foo',), [], [])
135
def test__get_sink(self):
136
repo = self.make_repository('repo')
137
sink = repo._get_sink()
138
self.assertIsInstance(sink, vf_repository.StreamSink)
140
def test_get_serializer_format(self):
141
repo = self.make_repository('.')
142
format = repo.get_serializer_format()
143
self.assertEqual(repo._serializer.format_num, format)
145
def test_add_revision_inventory_sha1(self):
146
inv = inventory.Inventory(revision_id='A')
147
inv.root.revision = 'A'
148
inv.root.file_id = 'fixed-root'
149
# Insert the inventory on its own to an identical repository, to get
151
reference_repo = self.make_repository('reference_repo')
152
reference_repo.lock_write()
153
reference_repo.start_write_group()
154
inv_sha1 = reference_repo.add_inventory('A', inv, [])
155
reference_repo.abort_write_group()
156
reference_repo.unlock()
157
# Now insert a revision with this inventory, and it should get the same
159
repo = self.make_repository('repo')
161
repo.start_write_group()
162
root_id = inv.root.file_id
163
repo.texts.add_lines(('fixed-root', 'A'), [], [])
164
repo.add_revision('A', _mod_revision.Revision(
165
'A', committer='B', timestamp=0,
166
timezone=0, message='C'), inv=inv)
167
repo.commit_write_group()
170
self.assertEqual(inv_sha1, repo.get_revision('A').inventory_sha1)
173
def test_install_revisions(self):
174
wt = self.make_branch_and_tree('source')
175
wt.commit('A', allow_pointless=True, rev_id='A')
176
repo = wt.branch.repository
178
repo.start_write_group()
179
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
180
repo.commit_write_group()
183
self.addCleanup(repo.unlock)
184
repo2 = self.make_repository('repo2')
185
revision = repo.get_revision('A')
186
tree = repo.revision_tree('A')
187
signature = repo.get_signature_text('A')
189
self.addCleanup(repo2.unlock)
190
vf_repository.install_revisions(repo2, [(revision, tree, signature)])
191
self.assertEqual(revision, repo2.get_revision('A'))
192
self.assertEqual(signature, repo2.get_signature_text('A'))
194
def test_attribute_text_store(self):
195
"""Test the existence of the texts attribute."""
196
tree = self.make_branch_and_tree('tree')
197
repo = tree.branch.repository
198
self.assertIsInstance(repo.texts,
199
versionedfile.VersionedFiles)
201
def test_iter_inventories_is_ordered(self):
203
tree = self.make_branch_and_tree('a')
204
first_revision = tree.commit('')
205
second_revision = tree.commit('')
207
self.addCleanup(tree.unlock)
208
revs = (first_revision, second_revision)
209
invs = tree.branch.repository.iter_inventories(revs)
210
for rev_id, inv in zip(revs, invs):
211
self.assertEqual(rev_id, inv.revision_id)
212
self.assertIsInstance(inv, inventory.CommonInventory)
214
def test_item_keys_introduced_by(self):
215
# Make a repo with one revision and one versioned file.
216
tree = self.make_branch_and_tree('t')
217
self.build_tree(['t/foo'])
218
tree.add('foo', 'file1')
219
tree.commit('message', rev_id='rev_id')
220
repo = tree.branch.repository
222
repo.start_write_group()
224
repo.sign_revision('rev_id', gpg.LoopbackGPGStrategy(None))
225
except errors.UnsupportedOperation:
228
signature_texts = ['rev_id']
229
repo.commit_write_group()
232
self.addCleanup(repo.unlock)
234
# Item keys will be in this order, for maximum convenience for
235
# generating data to insert into knit repository:
240
expected_item_keys = [
241
('file', 'file1', ['rev_id']),
242
('inventory', None, ['rev_id']),
243
('signatures', None, signature_texts),
244
('revisions', None, ['rev_id'])]
245
item_keys = list(repo.item_keys_introduced_by(['rev_id']))
247
(kind, file_id, list(versions))
248
for (kind, file_id, versions) in item_keys]
250
if repo.supports_rich_root():
251
# Check for the root versioned file in the item_keys, then remove
252
# it from streamed_names so we can compare that with
253
# expected_record_names.
254
# Note that the file keys can be in any order, so this test is
255
# written to allow that.
256
inv = repo.get_inventory('rev_id')
257
root_item_key = ('file', inv.root.file_id, ['rev_id'])
258
self.assertTrue(root_item_key in item_keys)
259
item_keys.remove(root_item_key)
261
self.assertEqual(expected_item_keys, item_keys)
263
def test_attribute_text_store_basics(self):
264
"""Test the basic behaviour of the text store."""
265
tree = self.make_branch_and_tree('tree')
266
repo = tree.branch.repository
268
file_key = (file_id,)
271
self.assertEqual(set(), set(repo.texts.keys()))
272
tree.add(['foo'], [file_id], ['file'])
273
tree.put_file_bytes_non_atomic(file_id, 'content\n')
275
rev_key = (tree.commit("foo"),)
276
except errors.IllegalPath:
277
raise tests.TestNotApplicable(
278
'file_id %r cannot be stored on this'
279
' platform for this repo format' % (file_id,))
280
if repo._format.rich_root_data:
281
root_commit = (tree.get_root_id(),) + rev_key
282
keys = set([root_commit])
283
parents = {root_commit:()}
287
keys.add(file_key + rev_key)
288
parents[file_key + rev_key] = ()
289
self.assertEqual(keys, set(repo.texts.keys()))
290
self.assertEqual(parents,
291
repo.texts.get_parent_map(repo.texts.keys()))
294
tree2 = self.make_branch_and_tree('tree2')
295
tree2.pull(tree.branch)
296
tree2.put_file_bytes_non_atomic('Foo:Bar', 'right\n')
297
right_key = (tree2.commit('right'),)
298
keys.add(file_key + right_key)
299
parents[file_key + right_key] = (file_key + rev_key,)
300
tree.put_file_bytes_non_atomic('Foo:Bar', 'left\n')
301
left_key = (tree.commit('left'),)
302
keys.add(file_key + left_key)
303
parents[file_key + left_key] = (file_key + rev_key,)
304
tree.merge_from_branch(tree2.branch)
305
tree.put_file_bytes_non_atomic('Foo:Bar', 'merged\n')
308
except errors.UnsupportedOperation:
310
merge_key = (tree.commit('merged'),)
311
keys.add(file_key + merge_key)
312
parents[file_key + merge_key] = (file_key + left_key,
313
file_key + right_key)
315
self.addCleanup(repo.unlock)
316
self.assertEqual(keys, set(repo.texts.keys()))
317
self.assertEqual(parents, repo.texts.get_parent_map(repo.texts.keys()))
320
class TestCaseWithComplexRepository(TestCaseWithRepository):
322
scenarios = all_repository_vf_format_scenarios()
325
super(TestCaseWithComplexRepository, self).setUp()
326
tree_a = self.make_branch_and_tree('a')
327
self.bzrdir = tree_a.branch.bzrdir
328
# add a corrupt inventory 'orphan'
329
# this may need some generalising for knits.
332
tree_a.branch.repository.start_write_group()
334
inv_file = tree_a.branch.repository.inventories
335
inv_file.add_lines(('orphan',), [], [])
337
tree_a.branch.repository.commit_write_group()
340
tree_a.branch.repository.abort_write_group()
343
# add a real revision 'rev1'
344
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
345
# add a real revision 'rev2' based on rev1
346
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
347
# add a reference to a ghost
348
tree_a.add_parent_tree_id('ghost1')
350
tree_a.commit('rev3', rev_id='rev3', allow_pointless=True)
351
except errors.RevisionNotPresent:
352
raise tests.TestNotApplicable(
353
"Cannot test with ghosts for this format.")
354
# add another reference to a ghost, and a second ghost.
355
tree_a.add_parent_tree_id('ghost1')
356
tree_a.add_parent_tree_id('ghost2')
357
tree_a.commit('rev4', rev_id='rev4', allow_pointless=True)
359
def test_revision_trees(self):
360
revision_ids = ['rev1', 'rev2', 'rev3', 'rev4']
361
repository = self.bzrdir.open_repository()
362
repository.lock_read()
363
self.addCleanup(repository.unlock)
364
trees1 = list(repository.revision_trees(revision_ids))
365
trees2 = [repository.revision_tree(t) for t in revision_ids]
366
self.assertEqual(len(trees1), len(trees2))
367
for tree1, tree2 in zip(trees1, trees2):
368
self.assertFalse(tree2.changes_from(tree1).has_changed())
370
def test_get_deltas_for_revisions(self):
371
repository = self.bzrdir.open_repository()
372
repository.lock_read()
373
self.addCleanup(repository.unlock)
374
revisions = [repository.get_revision(r) for r in
375
['rev1', 'rev2', 'rev3', 'rev4']]
376
deltas1 = list(repository.get_deltas_for_revisions(revisions))
377
deltas2 = [repository.get_revision_delta(r.revision_id) for r in
379
self.assertEqual(deltas1, deltas2)
381
def test_all_revision_ids(self):
382
# all_revision_ids -> all revisions
383
self.assertEqual(set(['rev1', 'rev2', 'rev3', 'rev4']),
384
set(self.bzrdir.open_repository().all_revision_ids()))
386
def test_reserved_id(self):
387
repo = self.make_repository('repository')
389
repo.start_write_group()
391
self.assertRaises(errors.ReservedId, repo.add_inventory,
392
'reserved:', None, None)
393
self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
394
"foo", [], 'reserved:', None)
395
self.assertRaises(errors.ReservedId, repo.add_revision,
398
repo.abort_write_group()
402
class TestCaseWithCorruptRepository(TestCaseWithRepository):
404
scenarios = all_repository_vf_format_scenarios()
407
super(TestCaseWithCorruptRepository, self).setUp()
408
# a inventory with no parents and the revision has parents..
410
repo = self.make_repository('inventory_with_unnecessary_ghost')
412
repo.start_write_group()
413
inv = inventory.Inventory(revision_id = 'ghost')
414
inv.root.revision = 'ghost'
415
if repo.supports_rich_root():
416
root_id = inv.root.file_id
417
repo.texts.add_lines((root_id, 'ghost'), [], [])
418
sha1 = repo.add_inventory('ghost', inv, [])
419
rev = _mod_revision.Revision(
420
timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
421
message="Message", inventory_sha1=sha1, revision_id='ghost')
422
rev.parent_ids = ['the_ghost']
424
repo.add_revision('ghost', rev)
425
except (errors.NoSuchRevision, errors.RevisionNotPresent):
426
raise tests.TestNotApplicable(
427
"Cannot test with ghosts for this format.")
429
inv = inventory.Inventory(revision_id = 'the_ghost')
430
inv.root.revision = 'the_ghost'
431
if repo.supports_rich_root():
432
root_id = inv.root.file_id
433
repo.texts.add_lines((root_id, 'the_ghost'), [], [])
434
sha1 = repo.add_inventory('the_ghost', inv, [])
435
rev = _mod_revision.Revision(
436
timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
437
message="Message", inventory_sha1=sha1, revision_id='the_ghost')
439
repo.add_revision('the_ghost', rev)
440
# check its setup usefully
441
inv_weave = repo.inventories
442
possible_parents = (None, (('ghost',),))
443
self.assertSubset(inv_weave.get_parent_map([('ghost',)])[('ghost',)],
445
repo.commit_write_group()
448
def test_corrupt_revision_access_asserts_if_reported_wrong(self):
449
repo_url = self.get_url('inventory_with_unnecessary_ghost')
450
repo = _mod_repository.Repository.open(repo_url)
451
m = MatchesAncestry(repo, 'ghost')
452
reported_wrong = False
454
if m.match(['the_ghost', 'ghost']) is not None:
455
reported_wrong = True
456
except errors.CorruptRepository:
457
# caught the bad data:
459
if not reported_wrong:
461
self.assertRaises(errors.CorruptRepository, repo.get_revision, 'ghost')
463
def test_corrupt_revision_get_revision_reconcile(self):
464
repo_url = self.get_url('inventory_with_unnecessary_ghost')
465
repo = _mod_repository.Repository.open(repo_url)
466
repo.get_revision_reconcile('ghost')