89
109
repo = tree.branch.repository
90
110
self.assertIsInstance(repo.signatures,
91
111
versionedfile.VersionedFiles)
113
def test_exposed_versioned_files_are_marked_dirty(self):
114
repo = self.make_repository('.')
116
signatures = repo.signatures
117
revisions = repo.revisions
118
inventories = repo.inventories
120
self.assertRaises(errors.ObjectNotLocked,
122
self.assertRaises(errors.ObjectNotLocked,
124
self.assertRaises(errors.ObjectNotLocked,
126
self.assertRaises(errors.ObjectNotLocked,
127
signatures.add_lines, ('foo',), [], [])
128
self.assertRaises(errors.ObjectNotLocked,
129
revisions.add_lines, ('foo',), [], [])
130
self.assertRaises(errors.ObjectNotLocked,
131
inventories.add_lines, ('foo',), [], [])
133
def test__get_sink(self):
134
repo = self.make_repository('repo')
135
sink = repo._get_sink()
136
self.assertIsInstance(sink, vf_repository.StreamSink)
138
def test_get_serializer_format(self):
139
repo = self.make_repository('.')
140
format = repo.get_serializer_format()
141
self.assertEqual(repo._serializer.format_num, format)
143
def test_add_revision_inventory_sha1(self):
144
inv = inventory.Inventory(revision_id='A')
145
inv.root.revision = 'A'
146
inv.root.file_id = 'fixed-root'
147
# Insert the inventory on its own to an identical repository, to get
149
reference_repo = self.make_repository('reference_repo')
150
reference_repo.lock_write()
151
reference_repo.start_write_group()
152
inv_sha1 = reference_repo.add_inventory('A', inv, [])
153
reference_repo.abort_write_group()
154
reference_repo.unlock()
155
# Now insert a revision with this inventory, and it should get the same
157
repo = self.make_repository('repo')
159
repo.start_write_group()
160
root_id = inv.root.file_id
161
repo.texts.add_lines(('fixed-root', 'A'), [], [])
162
repo.add_revision('A', _mod_revision.Revision(
163
'A', committer='B', timestamp=0,
164
timezone=0, message='C'), inv=inv)
165
repo.commit_write_group()
168
self.assertEquals(inv_sha1, repo.get_revision('A').inventory_sha1)
171
def test_install_revisions(self):
172
wt = self.make_branch_and_tree('source')
173
wt.commit('A', allow_pointless=True, rev_id='A')
174
repo = wt.branch.repository
176
repo.start_write_group()
177
repo.sign_revision('A', gpg.LoopbackGPGStrategy(None))
178
repo.commit_write_group()
181
self.addCleanup(repo.unlock)
182
repo2 = self.make_repository('repo2')
183
revision = repo.get_revision('A')
184
tree = repo.revision_tree('A')
185
signature = repo.get_signature_text('A')
187
self.addCleanup(repo2.unlock)
188
vf_repository.install_revisions(repo2, [(revision, tree, signature)])
189
self.assertEqual(revision, repo2.get_revision('A'))
190
self.assertEqual(signature, repo2.get_signature_text('A'))
192
def test_attribute_text_store(self):
193
"""Test the existence of the texts attribute."""
194
tree = self.make_branch_and_tree('tree')
195
repo = tree.branch.repository
196
self.assertIsInstance(repo.texts,
197
versionedfile.VersionedFiles)
199
def test_iter_inventories_is_ordered(self):
201
tree = self.make_branch_and_tree('a')
202
first_revision = tree.commit('')
203
second_revision = tree.commit('')
205
self.addCleanup(tree.unlock)
206
revs = (first_revision, second_revision)
207
invs = tree.branch.repository.iter_inventories(revs)
208
for rev_id, inv in zip(revs, invs):
209
self.assertEqual(rev_id, inv.revision_id)
210
self.assertIsInstance(inv, inventory.CommonInventory)
212
def test_item_keys_introduced_by(self):
213
# Make a repo with one revision and one versioned file.
214
tree = self.make_branch_and_tree('t')
215
self.build_tree(['t/foo'])
216
tree.add('foo', 'file1')
217
tree.commit('message', rev_id='rev_id')
218
repo = tree.branch.repository
220
repo.start_write_group()
222
repo.sign_revision('rev_id', gpg.LoopbackGPGStrategy(None))
223
except errors.UnsupportedOperation:
226
signature_texts = ['rev_id']
227
repo.commit_write_group()
230
self.addCleanup(repo.unlock)
232
# Item keys will be in this order, for maximum convenience for
233
# generating data to insert into knit repository:
238
expected_item_keys = [
239
('file', 'file1', ['rev_id']),
240
('inventory', None, ['rev_id']),
241
('signatures', None, signature_texts),
242
('revisions', None, ['rev_id'])]
243
item_keys = list(repo.item_keys_introduced_by(['rev_id']))
245
(kind, file_id, list(versions))
246
for (kind, file_id, versions) in item_keys]
248
if repo.supports_rich_root():
249
# Check for the root versioned file in the item_keys, then remove
250
# it from streamed_names so we can compare that with
251
# expected_record_names.
252
# Note that the file keys can be in any order, so this test is
253
# written to allow that.
254
inv = repo.get_inventory('rev_id')
255
root_item_key = ('file', inv.root.file_id, ['rev_id'])
256
self.assertTrue(root_item_key in item_keys)
257
item_keys.remove(root_item_key)
259
self.assertEqual(expected_item_keys, item_keys)
262
class TestCaseWithComplexRepository(TestCaseWithRepository):
264
scenarios = all_repository_vf_format_scenarios()
267
super(TestCaseWithComplexRepository, self).setUp()
268
tree_a = self.make_branch_and_tree('a')
269
self.bzrdir = tree_a.branch.bzrdir
270
# add a corrupt inventory 'orphan'
271
# this may need some generalising for knits.
274
tree_a.branch.repository.start_write_group()
276
inv_file = tree_a.branch.repository.inventories
277
inv_file.add_lines(('orphan',), [], [])
279
tree_a.branch.repository.commit_write_group()
282
tree_a.branch.repository.abort_write_group()
285
# add a real revision 'rev1'
286
tree_a.commit('rev1', rev_id='rev1', allow_pointless=True)
287
# add a real revision 'rev2' based on rev1
288
tree_a.commit('rev2', rev_id='rev2', allow_pointless=True)
289
# add a reference to a ghost
290
tree_a.add_parent_tree_id('ghost1')
292
tree_a.commit('rev3', rev_id='rev3', allow_pointless=True)
293
except errors.RevisionNotPresent:
294
raise tests.TestNotApplicable(
295
"Cannot test with ghosts for this format.")
296
# add another reference to a ghost, and a second ghost.
297
tree_a.add_parent_tree_id('ghost1')
298
tree_a.add_parent_tree_id('ghost2')
299
tree_a.commit('rev4', rev_id='rev4', allow_pointless=True)
301
def test_revision_trees(self):
302
revision_ids = ['rev1', 'rev2', 'rev3', 'rev4']
303
repository = self.bzrdir.open_repository()
304
repository.lock_read()
305
self.addCleanup(repository.unlock)
306
trees1 = list(repository.revision_trees(revision_ids))
307
trees2 = [repository.revision_tree(t) for t in revision_ids]
308
self.assertEqual(len(trees1), len(trees2))
309
for tree1, tree2 in zip(trees1, trees2):
310
self.assertFalse(tree2.changes_from(tree1).has_changed())
312
def test_get_deltas_for_revisions(self):
313
repository = self.bzrdir.open_repository()
314
repository.lock_read()
315
self.addCleanup(repository.unlock)
316
revisions = [repository.get_revision(r) for r in
317
['rev1', 'rev2', 'rev3', 'rev4']]
318
deltas1 = list(repository.get_deltas_for_revisions(revisions))
319
deltas2 = [repository.get_revision_delta(r.revision_id) for r in
321
self.assertEqual(deltas1, deltas2)
323
def test_all_revision_ids(self):
324
# all_revision_ids -> all revisions
325
self.assertEqual(set(['rev1', 'rev2', 'rev3', 'rev4']),
326
set(self.bzrdir.open_repository().all_revision_ids()))
328
def test_get_ancestry_missing_revision(self):
329
# get_ancestry(revision that is in some data but not fully installed
331
self.assertRaises(errors.NoSuchRevision,
332
self.bzrdir.open_repository().get_ancestry, 'orphan')
334
def test_get_unordered_ancestry(self):
335
repo = self.bzrdir.open_repository()
336
self.assertEqual(set(repo.get_ancestry('rev3')),
337
set(repo.get_ancestry('rev3', topo_sorted=False)))
339
def test_reserved_id(self):
340
repo = self.make_repository('repository')
342
repo.start_write_group()
344
self.assertRaises(errors.ReservedId, repo.add_inventory,
345
'reserved:', None, None)
346
self.assertRaises(errors.ReservedId, repo.add_inventory_by_delta,
347
"foo", [], 'reserved:', None)
348
self.assertRaises(errors.ReservedId, repo.add_revision,
351
repo.abort_write_group()
355
class TestCaseWithCorruptRepository(TestCaseWithRepository):
357
scenarios = all_repository_vf_format_scenarios()
360
super(TestCaseWithCorruptRepository, self).setUp()
361
# a inventory with no parents and the revision has parents..
363
repo = self.make_repository('inventory_with_unnecessary_ghost')
365
repo.start_write_group()
366
inv = inventory.Inventory(revision_id = 'ghost')
367
inv.root.revision = 'ghost'
368
if repo.supports_rich_root():
369
root_id = inv.root.file_id
370
repo.texts.add_lines((root_id, 'ghost'), [], [])
371
sha1 = repo.add_inventory('ghost', inv, [])
372
rev = _mod_revision.Revision(
373
timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
374
message="Message", inventory_sha1=sha1, revision_id='ghost')
375
rev.parent_ids = ['the_ghost']
377
repo.add_revision('ghost', rev)
378
except (errors.NoSuchRevision, errors.RevisionNotPresent):
379
raise tests.TestNotApplicable(
380
"Cannot test with ghosts for this format.")
382
inv = inventory.Inventory(revision_id = 'the_ghost')
383
inv.root.revision = 'the_ghost'
384
if repo.supports_rich_root():
385
root_id = inv.root.file_id
386
repo.texts.add_lines((root_id, 'the_ghost'), [], [])
387
sha1 = repo.add_inventory('the_ghost', inv, [])
388
rev = _mod_revision.Revision(
389
timestamp=0, timezone=None, committer="Foo Bar <foo@example.com>",
390
message="Message", inventory_sha1=sha1, revision_id='the_ghost')
392
repo.add_revision('the_ghost', rev)
393
# check its setup usefully
394
inv_weave = repo.inventories
395
possible_parents = (None, (('ghost',),))
396
self.assertSubset(inv_weave.get_parent_map([('ghost',)])[('ghost',)],
398
repo.commit_write_group()
401
def test_corrupt_revision_access_asserts_if_reported_wrong(self):
402
repo_url = self.get_url('inventory_with_unnecessary_ghost')
403
repo = _mod_repository.Repository.open(repo_url)
404
reported_wrong = False
406
if repo.get_ancestry('ghost') != [None, 'the_ghost', 'ghost']:
407
reported_wrong = True
408
except errors.CorruptRepository:
409
# caught the bad data:
411
if not reported_wrong:
413
self.assertRaises(errors.CorruptRepository, repo.get_revision, 'ghost')
415
def test_corrupt_revision_get_revision_reconcile(self):
416
repo_url = self.get_url('inventory_with_unnecessary_ghost')
417
repo = _mod_repository.Repository.open(repo_url)
418
repo.get_revision_reconcile('ghost')