15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
from bzrlib import errors, chk_map, inventory, osutils
19
27
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
20
28
InventoryDirectory, InventoryEntry, TreeReference)
21
from bzrlib.tests import TestCase, TestCaseWithTransport
29
from bzrlib.tests import (
31
TestCaseWithTransport,
34
split_suite_by_condition,
36
from bzrlib.tests.per_workingtree import workingtree_formats
39
def load_tests(standard_tests, module, loader):
40
"""Parameterise some inventory tests."""
41
to_adapt, result = split_suite_by_condition(standard_tests,
42
condition_isinstance(TestDeltaApplication))
44
('Inventory', {'apply_delta':apply_inventory_Inventory}),
46
# Working tree basis delta application
47
# Repository add_inv_by_delta.
48
# Reduce form of the per_repository test logic - that logic needs to be
49
# be able to get /just/ repositories whereas these tests are fine with
50
# just creating trees.
52
for _, format in repository.format_registry.iteritems():
53
scenarios.append((str(format.__name__), {
54
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
56
for format in workingtree_formats():
58
(str(format.__class__.__name__) + ".update_basis_by_delta", {
59
'apply_delta':apply_inventory_WT_basis,
62
(str(format.__class__.__name__) + ".apply_inventory_delta", {
63
'apply_delta':apply_inventory_WT,
65
return multiply_tests(to_adapt, scenarios, result)
68
def apply_inventory_Inventory(self, basis, delta):
69
"""Apply delta to basis and return the result.
71
:param basis: An inventory to be used as the basis.
72
:param delta: The inventory delta to apply:
73
:return: An inventory resulting from the application.
75
basis.apply_delta(delta)
79
def apply_inventory_WT(self, basis, delta):
80
"""Apply delta to basis and return the result.
82
This sets the tree state to be basis, and then calls apply_inventory_delta.
84
:param basis: An inventory to be used as the basis.
85
:param delta: The inventory delta to apply:
86
:return: An inventory resulting from the application.
88
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
89
control.create_repository()
90
control.create_branch()
91
tree = self.format.initialize(control)
94
tree._write_inventory(basis)
97
# Fresh object, reads disk again.
98
tree = tree.bzrdir.open_workingtree()
101
tree.apply_inventory_delta(delta)
104
# reload tree - ensure we get what was written.
105
tree = tree.bzrdir.open_workingtree()
107
self.addCleanup(tree.unlock)
108
# One could add 'tree._validate' here but that would cause 'early' failues
109
# as far as higher level code is concerned. Possibly adding an
110
# expect_fail parameter to this function and if that is False then do a
112
return tree.inventory
115
def apply_inventory_WT_basis(self, basis, delta):
116
"""Apply delta to basis and return the result.
118
This sets the parent and then calls update_basis_by_delta.
119
It also puts the basis in the repository under both 'basis' and 'result' to
120
allow safety checks made by the WT to succeed, and finally ensures that all
121
items in the delta with a new path are present in the WT before calling
122
update_basis_by_delta.
124
:param basis: An inventory to be used as the basis.
125
:param delta: The inventory delta to apply:
126
:return: An inventory resulting from the application.
128
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
129
control.create_repository()
130
control.create_branch()
131
tree = self.format.initialize(control)
134
repo = tree.branch.repository
135
repo.start_write_group()
137
rev = revision.Revision('basis', timestamp=0, timezone=None,
138
message="", committer="foo@example.com")
139
basis.revision_id = 'basis'
140
repo.add_revision('basis', rev, basis)
141
# Add a revision for the result, with the basis content -
142
# update_basis_by_delta doesn't check that the delta results in
143
# result, and we want inconsistent deltas to get called on the
144
# tree, or else the code isn't actually checked.
145
rev = revision.Revision('result', timestamp=0, timezone=None,
146
message="", committer="foo@example.com")
147
basis.revision_id = 'result'
148
repo.add_revision('result', rev, basis)
150
repo.abort_write_group()
153
repo.commit_write_group()
154
# Set the basis state as the trees current state
155
tree._write_inventory(basis)
156
# This reads basis from the repo and puts it into the tree's local
157
# cache, if it has one.
158
tree.set_parent_ids(['basis'])
161
for old, new, id, entry in delta:
162
if None in (new, entry):
164
paths[new] = (entry.file_id, entry.kind)
165
parents.add(osutils.dirname(new))
166
parents = osutils.minimum_path_selection(parents)
168
# Put place holders in the tree to permit adding the other entries.
169
for pos, parent in enumerate(parents):
170
if not tree.path2id(parent):
171
# add a synthetic directory in the tree so we can can put the
172
# tree0 entries in place for dirstate.
173
tree.add([parent], ["id%d" % pos], ["directory"])
175
# Many deltas may cause this mini-apply to fail, but we want to see what
176
# the delta application code says, not the prep that we do to deal with
177
# limitations of dirstate's update_basis code.
178
for path, (file_id, kind) in sorted(paths.items()):
180
tree.add([path], [file_id], [kind])
181
except (KeyboardInterrupt, SystemExit):
187
# Fresh lock, reads disk again.
190
tree.update_basis_by_delta('result', delta)
193
# reload tree - ensure we get what was written.
194
tree = tree.bzrdir.open_workingtree()
195
basis_tree = tree.basis_tree()
196
basis_tree.lock_read()
197
self.addCleanup(basis_tree.unlock)
198
# Note, that if the tree does not have a local cache, the trick above of
199
# setting the result as the basis, will come back to bite us. That said,
200
# all the implementations in bzr do have a local cache.
201
return basis_tree.inventory
204
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
205
"""Apply delta to basis and return the result.
207
This inserts basis as a whole inventory and then uses
208
add_inventory_by_delta to add delta.
210
:param basis: An inventory to be used as the basis.
211
:param delta: The inventory delta to apply:
212
:return: An inventory resulting from the application.
214
format = self.format()
215
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
216
repo = format.initialize(control)
219
repo.start_write_group()
221
rev = revision.Revision('basis', timestamp=0, timezone=None,
222
message="", committer="foo@example.com")
223
basis.revision_id = 'basis'
224
repo.add_revision('basis', rev, basis)
226
repo.abort_write_group()
229
repo.commit_write_group()
234
repo.start_write_group()
236
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
239
repo.abort_write_group()
242
repo.commit_write_group()
245
# Fresh lock, reads disk again.
246
repo = repo.bzrdir.open_repository()
248
self.addCleanup(repo.unlock)
249
return repo.get_inventory('result')
252
class TestDeltaApplication(TestCaseWithTransport):
254
def get_empty_inventory(self, reference_inv=None):
255
"""Get an empty inventory.
257
Note that tests should not depend on the revision of the root for
258
setting up test conditions, as it has to be flexible to accomodate non
259
rich root repositories.
261
:param reference_inv: If not None, get the revision for the root from
262
this inventory. This is useful for dealing with older repositories
263
that routinely discarded the root entry data. If None, the root's
264
revision is set to 'basis'.
266
inv = inventory.Inventory()
267
if reference_inv is not None:
268
inv.root.revision = reference_inv.root.revision
270
inv.root.revision = 'basis'
273
def test_empty_delta(self):
274
inv = self.get_empty_inventory()
276
inv = self.apply_delta(self, inv, delta)
277
inv2 = self.get_empty_inventory(inv)
278
self.assertEqual([], inv2._make_delta(inv))
280
def test_None_file_id(self):
281
inv = self.get_empty_inventory()
282
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
283
dir1.revision = 'result'
284
delta = [(None, u'dir1', None, dir1)]
285
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
288
def test_unicode_file_id(self):
289
inv = self.get_empty_inventory()
290
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
291
dir1.revision = 'result'
292
delta = [(None, u'dir1', dir1.file_id, dir1)]
293
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
296
def test_repeated_file_id(self):
297
inv = self.get_empty_inventory()
298
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
299
file1.revision = 'result'
302
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
303
file2.revision = 'result'
306
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
307
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
310
def test_repeated_new_path(self):
311
inv = self.get_empty_inventory()
312
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
313
file1.revision = 'result'
316
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
317
file2.revision = 'result'
320
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
321
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
324
def test_repeated_old_path(self):
325
inv = self.get_empty_inventory()
326
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
327
file1.revision = 'result'
330
# We can't *create* a source inventory with the same path, but
331
# a badly generated partial delta might claim the same source twice.
332
# This would be buggy in two ways: the path is repeated in the delta,
333
# And the path for one of the file ids doesn't match the source
334
# location. Alternatively, we could have a repeated fileid, but that
335
# is separately checked for.
336
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
337
file2.revision = 'result'
342
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
343
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
346
def test_mismatched_id_entry_id(self):
347
inv = self.get_empty_inventory()
348
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
349
file1.revision = 'result'
352
delta = [(None, u'path', 'id', file1)]
353
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
356
def test_mismatched_new_path_entry_None(self):
357
inv = self.get_empty_inventory()
358
delta = [(None, u'path', 'id', None)]
359
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
362
def test_mismatched_new_path_None_entry(self):
363
inv = self.get_empty_inventory()
364
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
365
file1.revision = 'result'
368
delta = [(u"path", None, 'id1', file1)]
369
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
372
def test_parent_is_not_directory(self):
373
inv = self.get_empty_inventory()
374
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
375
file1.revision = 'result'
378
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
379
file2.revision = 'result'
383
delta = [(None, u'path/path2', 'id2', file2)]
384
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
387
def test_parent_is_missing(self):
388
inv = self.get_empty_inventory()
389
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
390
file2.revision = 'result'
393
delta = [(None, u'path/path2', 'id2', file2)]
394
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
397
def test_new_parent_path_has_wrong_id(self):
398
inv = self.get_empty_inventory()
399
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
400
parent1.revision = 'result'
401
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
402
parent2.revision = 'result'
403
file1 = inventory.InventoryFile('id', 'path', 'p-2')
404
file1.revision = 'result'
409
# This delta claims that file1 is at dir/path, but actually its at
410
# dir2/path if you follow the inventory parent structure.
411
delta = [(None, u'dir/path', 'id', file1)]
412
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
415
def test_old_parent_path_is_wrong(self):
416
inv = self.get_empty_inventory()
417
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
418
parent1.revision = 'result'
419
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
420
parent2.revision = 'result'
421
file1 = inventory.InventoryFile('id', 'path', 'p-2')
422
file1.revision = 'result'
428
# This delta claims that file1 was at dir/path, but actually it was at
429
# dir2/path if you follow the inventory parent structure.
430
delta = [(u'dir/path', None, 'id', None)]
431
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
434
def test_old_parent_path_is_for_other_id(self):
435
inv = self.get_empty_inventory()
436
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
437
parent1.revision = 'result'
438
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
439
parent2.revision = 'result'
440
file1 = inventory.InventoryFile('id', 'path', 'p-2')
441
file1.revision = 'result'
444
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
445
file2.revision = 'result'
452
# This delta claims that file1 was at dir/path, but actually it was at
453
# dir2/path if you follow the inventory parent structure. At dir/path
454
# is another entry we should not delete.
455
delta = [(u'dir/path', None, 'id', None)]
456
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
459
def test_add_existing_id_new_path(self):
460
inv = self.get_empty_inventory()
461
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
462
parent1.revision = 'result'
463
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
464
parent2.revision = 'result'
466
delta = [(None, u'dir2', 'p-1', parent2)]
467
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
470
def test_add_new_id_existing_path(self):
471
inv = self.get_empty_inventory()
472
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
473
parent1.revision = 'result'
474
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
475
parent2.revision = 'result'
477
delta = [(None, u'dir1', 'p-2', parent2)]
478
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
481
def test_remove_dir_leaving_dangling_child(self):
482
inv = self.get_empty_inventory()
483
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
484
dir1.revision = 'result'
485
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
486
dir2.revision = 'result'
487
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
488
dir3.revision = 'result'
492
delta = [(u'dir1', None, 'p-1', None),
493
(u'dir1/child2', None, 'p-3', None)]
494
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
24
498
class TestInventoryEntry(TestCase):