29
from bzrlib.inventory import (
37
mutable_inventory_from_tree,
28
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
29
InventoryDirectory, InventoryEntry, TreeReference)
39
30
from bzrlib.tests import (
41
32
TestCaseWithTransport,
35
split_suite_by_condition,
43
from bzrlib.tests.scenarios import load_tests_apply_scenarios
46
load_tests = load_tests_apply_scenarios
49
def delta_application_scenarios():
37
from bzrlib.tests.per_workingtree import workingtree_formats
40
def load_tests(standard_tests, module, loader):
41
"""Parameterise some inventory tests."""
42
to_adapt, result = split_suite_by_condition(standard_tests,
43
condition_isinstance(TestDeltaApplication))
51
45
('Inventory', {'apply_delta':apply_inventory_Inventory}),
57
51
# just creating trees.
59
53
for _, format in repository.format_registry.iteritems():
60
if format.supports_full_versioned_files:
61
scenarios.append((str(format.__name__), {
62
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
64
for format in workingtree.format_registry._get_all():
65
repo_fmt = format._matchingbzrdir.repository_format
66
if not repo_fmt.supports_full_versioned_files:
54
scenarios.append((str(format.__name__), {
55
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
57
for format in workingtree_formats():
69
59
(str(format.__class__.__name__) + ".update_basis_by_delta", {
70
60
'apply_delta':apply_inventory_WT_basis,
125
114
tree = tree.bzrdir.open_workingtree()
127
116
self.addCleanup(tree.unlock)
128
if not invalid_delta:
117
# One could add 'tree._validate' here but that would cause 'early' failues
118
# as far as higher level code is concerned. Possibly adding an
119
# expect_fail parameter to this function and if that is False then do a
130
121
return tree.inventory
133
def _create_repo_revisions(repo, basis, delta, invalid_delta):
134
repo.start_write_group()
136
rev = revision.Revision('basis', timestamp=0, timezone=None,
137
message="", committer="foo@example.com")
138
basis.revision_id = 'basis'
139
create_texts_for_inv(repo, basis)
140
repo.add_revision('basis', rev, basis)
142
# We don't want to apply the delta to the basis, because we expect
143
# the delta is invalid.
145
result_inv.revision_id = 'result'
146
target_entries = None
148
result_inv = basis.create_by_apply_delta(delta, 'result')
149
create_texts_for_inv(repo, result_inv)
150
target_entries = list(result_inv.iter_entries_by_dir())
151
rev = revision.Revision('result', timestamp=0, timezone=None,
152
message="", committer="foo@example.com")
153
repo.add_revision('result', rev, result_inv)
154
repo.commit_write_group()
156
repo.abort_write_group()
158
return target_entries
161
def _get_basis_entries(tree):
162
basis_tree = tree.basis_tree()
163
basis_tree.lock_read()
164
basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
166
return basis_tree_entries
169
def _populate_different_tree(tree, basis, delta):
170
"""Put all entries into tree, but at a unique location."""
173
tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
174
for path, ie in basis.iter_entries_by_dir():
175
if ie.file_id in added_ids:
177
# We want a unique path for each of these, we use the file-id
178
tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
179
added_ids.add(ie.file_id)
180
for old_path, new_path, file_id, ie in delta:
181
if file_id in added_ids:
183
tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
186
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
124
def apply_inventory_WT_basis(self, basis, delta):
187
125
"""Apply delta to basis and return the result.
189
127
This sets the parent and then calls update_basis_by_delta.
191
129
allow safety checks made by the WT to succeed, and finally ensures that all
192
130
items in the delta with a new path are present in the WT before calling
193
131
update_basis_by_delta.
195
133
:param basis: An inventory to be used as the basis.
196
134
:param delta: The inventory delta to apply:
197
135
:return: An inventory resulting from the application.
199
control = test.make_bzrdir('tree', format=test.format._matchingbzrdir)
137
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
200
138
control.create_repository()
201
139
control.create_branch()
202
tree = test.format.initialize(control)
140
tree = self.format.initialize(control)
203
141
tree.lock_write()
205
target_entries = _create_repo_revisions(tree.branch.repository, basis,
206
delta, invalid_delta)
143
repo = tree.branch.repository
144
repo.start_write_group()
146
rev = revision.Revision('basis', timestamp=0, timezone=None,
147
message="", committer="foo@example.com")
148
basis.revision_id = 'basis'
149
create_texts_for_inv(tree.branch.repository, basis)
150
repo.add_revision('basis', rev, basis)
151
# Add a revision for the result, with the basis content -
152
# update_basis_by_delta doesn't check that the delta results in
153
# result, and we want inconsistent deltas to get called on the
154
# tree, or else the code isn't actually checked.
155
rev = revision.Revision('result', timestamp=0, timezone=None,
156
message="", committer="foo@example.com")
157
basis.revision_id = 'result'
158
repo.add_revision('result', rev, basis)
159
repo.commit_write_group()
161
repo.abort_write_group()
207
163
# Set the basis state as the trees current state
208
164
tree._write_inventory(basis)
209
165
# This reads basis from the repo and puts it into the tree's local
210
166
# cache, if it has one.
211
167
tree.set_parent_ids(['basis'])
170
for old, new, id, entry in delta:
171
if None in (new, entry):
173
paths[new] = (entry.file_id, entry.kind)
174
parents.add(osutils.dirname(new))
175
parents = osutils.minimum_path_selection(parents)
177
# Put place holders in the tree to permit adding the other entries.
178
for pos, parent in enumerate(parents):
179
if not tree.path2id(parent):
180
# add a synthetic directory in the tree so we can can put the
181
# tree0 entries in place for dirstate.
182
tree.add([parent], ["id%d" % pos], ["directory"])
184
# Many deltas may cause this mini-apply to fail, but we want to see what
185
# the delta application code says, not the prep that we do to deal with
186
# limitations of dirstate's update_basis code.
187
for path, (file_id, kind) in sorted(paths.items()):
189
tree.add([path], [file_id], [kind])
190
except (KeyboardInterrupt, SystemExit):
214
196
# Fresh lock, reads disk again.
215
197
tree.lock_write()
217
199
tree.update_basis_by_delta('result', delta)
218
if not invalid_delta:
222
202
# reload tree - ensure we get what was written.
223
203
tree = tree.bzrdir.open_workingtree()
224
204
basis_tree = tree.basis_tree()
225
205
basis_tree.lock_read()
226
test.addCleanup(basis_tree.unlock)
227
basis_inv = basis_tree.inventory
229
basis_entries = list(basis_inv.iter_entries_by_dir())
230
test.assertEqual(target_entries, basis_entries)
234
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
206
self.addCleanup(basis_tree.unlock)
207
# Note, that if the tree does not have a local cache, the trick above of
208
# setting the result as the basis, will come back to bite us. That said,
209
# all the implementations in bzr do have a local cache.
210
return basis_tree.inventory
213
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
236
214
"""Apply delta to basis and return the result.
238
216
This inserts basis as a whole inventory and then uses
600
573
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
603
def test_add_file(self):
604
inv = self.get_empty_inventory()
605
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
606
file1.revision = 'result'
609
delta = [(None, u'path', 'file-id', file1)]
610
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
611
self.assertEqual('file-id', res_inv['file-id'].file_id)
613
def test_remove_file(self):
614
inv = self.get_empty_inventory()
615
file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
616
file1.revision = 'result'
620
delta = [(u'path', None, 'file-id', None)]
621
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
622
self.assertEqual(None, res_inv.path2id('path'))
623
self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
625
def test_rename_file(self):
626
inv = self.get_empty_inventory()
627
file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
629
file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
630
delta = [(u'path', 'path2', 'file-id', file2)]
631
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
632
self.assertEqual(None, res_inv.path2id('path'))
633
self.assertEqual('file-id', res_inv.path2id('path2'))
635
def test_replaced_at_new_path(self):
636
inv = self.get_empty_inventory()
637
file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
639
file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
640
delta = [(u'name', None, 'id1', None),
641
(None, u'name', 'id2', file2)]
642
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
643
self.assertEqual('id2', res_inv.path2id('name'))
645
def test_rename_dir(self):
646
inv = self.get_empty_inventory()
647
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
648
dir1.revision = 'basis'
649
file1 = self.make_file_ie(parent_id='dir-id')
652
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
653
dir2.revision = 'result'
654
delta = [('dir1', 'dir2', 'dir-id', dir2)]
655
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
656
# The file should be accessible under the new path
657
self.assertEqual('file-id', res_inv.path2id('dir2/name'))
659
def test_renamed_dir_with_renamed_child(self):
660
inv = self.get_empty_inventory()
661
dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
662
dir1.revision = 'basis'
663
file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
664
file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
668
dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
669
dir2.revision = 'result'
670
file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
671
delta = [('dir1', 'dir2', 'dir-id', dir2),
672
('dir1/name2', 'name2', 'file-id-2', file2b)]
673
res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
674
# The file should be accessible under the new path
675
self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
676
self.assertEqual(None, res_inv.path2id('dir2/name2'))
677
self.assertEqual('file-id-2', res_inv.path2id('name2'))
577
class TestInventory(TestCase):
679
579
def test_is_root(self):
680
580
"""Ensure our root-checking code is accurate."""
743
638
def test_file_has_text(self):
744
639
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
745
self.assertTrue(file.has_text())
640
self.failUnless(file.has_text())
747
642
def test_directory_has_text(self):
748
643
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
749
self.assertFalse(dir.has_text())
644
self.failIf(dir.has_text())
751
646
def test_link_has_text(self):
752
647
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
753
self.assertFalse(link.has_text())
648
self.failIf(link.has_text())
755
650
def test_make_entry(self):
756
651
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
1311
1206
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1312
1207
inv._bytes_to_utf8name_key(bytes))
1314
def make_basic_utf8_inventory(self):
1316
inv.revision_id = "revid"
1317
inv.root.revision = "rootrev"
1318
root_id = inv.root.file_id
1319
inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1320
inv["fileid"].revision = "filerev"
1321
inv["fileid"].text_sha1 = "ffff"
1322
inv["fileid"].text_size = 0
1323
inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1324
inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1325
inv["childid"].revision = "filerev"
1326
inv["childid"].text_sha1 = "ffff"
1327
inv["childid"].text_size = 0
1328
chk_bytes = self.get_chk_bytes()
1329
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1330
bytes = ''.join(chk_inv.to_lines())
1331
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1333
def test__preload_handles_utf8(self):
1334
new_inv = self.make_basic_utf8_inventory()
1335
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1336
self.assertFalse(new_inv._fully_cached)
1337
new_inv._preload_cache()
1339
sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1340
sorted(new_inv._fileid_to_entry_cache.keys()))
1341
ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1342
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1343
sorted(ie_root._children.keys()))
1344
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1345
self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1347
def test__preload_populates_cache(self):
1349
inv.revision_id = "revid"
1350
inv.root.revision = "rootrev"
1351
root_id = inv.root.file_id
1352
inv.add(InventoryFile("fileid", "file", root_id))
1353
inv["fileid"].revision = "filerev"
1354
inv["fileid"].executable = True
1355
inv["fileid"].text_sha1 = "ffff"
1356
inv["fileid"].text_size = 1
1357
inv.add(InventoryDirectory("dirid", "dir", root_id))
1358
inv.add(InventoryFile("childid", "child", "dirid"))
1359
inv["childid"].revision = "filerev"
1360
inv["childid"].executable = False
1361
inv["childid"].text_sha1 = "dddd"
1362
inv["childid"].text_size = 1
1363
chk_bytes = self.get_chk_bytes()
1364
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1365
bytes = ''.join(chk_inv.to_lines())
1366
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1367
self.assertEqual({}, new_inv._fileid_to_entry_cache)
1368
self.assertFalse(new_inv._fully_cached)
1369
new_inv._preload_cache()
1371
sorted([root_id, "fileid", "dirid", "childid"]),
1372
sorted(new_inv._fileid_to_entry_cache.keys()))
1373
self.assertTrue(new_inv._fully_cached)
1374
ie_root = new_inv._fileid_to_entry_cache[root_id]
1375
self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1376
ie_dir = new_inv._fileid_to_entry_cache['dirid']
1377
self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1379
def test__preload_handles_partially_evaluated_inventory(self):
1380
new_inv = self.make_basic_utf8_inventory()
1381
ie = new_inv[new_inv.root_id]
1382
self.assertIs(None, ie._children)
1383
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1384
sorted(ie.children.keys()))
1385
# Accessing .children loads _children
1386
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1387
sorted(ie._children.keys()))
1388
new_inv._preload_cache()
1390
self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1391
sorted(ie._children.keys()))
1392
ie_dir = new_inv["dirid"]
1393
self.assertEqual([u'ch\xefld'],
1394
sorted(ie_dir._children.keys()))
1397
1210
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1524
1337
inv = self.make_simple_inventory()
1525
1338
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1526
1339
'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
1529
class TestMutableInventoryFromTree(TestCaseWithTransport):
1531
def test_empty(self):
1532
repository = self.make_repository('.')
1533
tree = repository.revision_tree(revision.NULL_REVISION)
1534
inv = mutable_inventory_from_tree(tree)
1535
self.assertEquals(revision.NULL_REVISION, inv.revision_id)
1536
self.assertEquals(0, len(inv))
1538
def test_some_files(self):
1539
wt = self.make_branch_and_tree('.')
1540
self.build_tree(['a'])
1541
wt.add(['a'], ['thefileid'])
1542
revid = wt.commit("commit")
1543
tree = wt.branch.repository.revision_tree(revid)
1544
inv = mutable_inventory_from_tree(tree)
1545
self.assertEquals(revid, inv.revision_id)
1546
self.assertEquals(2, len(inv))
1547
self.assertEquals("a", inv['thefileid'].name)
1548
# The inventory should be mutable and independent of
1550
self.assertFalse(tree.inventory['thefileid'].executable)
1551
inv['thefileid'].executable = True
1552
self.assertFalse(tree.inventory['thefileid'].executable)