1
# Copyright (C) 2005 by Canonical Ltd
1
# Copyright (C) 2005-2010 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from cStringIO import StringIO
20
from bzrlib.branch import Branch
21
from bzrlib.clone import copy_branch
22
import bzrlib.errors as errors
23
from bzrlib.diff import internal_diff
24
from bzrlib.inventory import Inventory, ROOT_ID
25
import bzrlib.inventory as inventory
26
from bzrlib.osutils import has_symlinks, rename
27
from bzrlib.tests import TestCase, TestCaseInTempDir
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
28
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
29
InventoryDirectory, InventoryEntry, TreeReference)
30
from bzrlib.tests import (
32
TestCaseWithTransport,
35
split_suite_by_condition,
37
from bzrlib.tests.per_workingtree import workingtree_formats
40
def load_tests(standard_tests, module, loader):
41
"""Parameterise some inventory tests."""
42
to_adapt, result = split_suite_by_condition(standard_tests,
43
condition_isinstance(TestDeltaApplication))
45
('Inventory', {'apply_delta':apply_inventory_Inventory}),
47
# Working tree basis delta application
48
# Repository add_inv_by_delta.
49
# Reduce form of the per_repository test logic - that logic needs to be
50
# be able to get /just/ repositories whereas these tests are fine with
51
# just creating trees.
53
for _, format in repository.format_registry.iteritems():
54
scenarios.append((str(format.__name__), {
55
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
57
for format in workingtree_formats():
59
(str(format.__class__.__name__) + ".update_basis_by_delta", {
60
'apply_delta':apply_inventory_WT_basis,
63
(str(format.__class__.__name__) + ".apply_inventory_delta", {
64
'apply_delta':apply_inventory_WT,
66
return multiply_tests(to_adapt, scenarios, result)
69
def create_texts_for_inv(repo, inv):
70
for path, ie in inv.iter_entries():
72
lines = ['a' * ie.text_size]
75
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
77
def apply_inventory_Inventory(self, basis, delta):
78
"""Apply delta to basis and return the result.
80
:param basis: An inventory to be used as the basis.
81
:param delta: The inventory delta to apply:
82
:return: An inventory resulting from the application.
84
basis.apply_delta(delta)
88
def apply_inventory_WT(self, basis, delta):
89
"""Apply delta to basis and return the result.
91
This sets the tree state to be basis, and then calls apply_inventory_delta.
93
:param basis: An inventory to be used as the basis.
94
:param delta: The inventory delta to apply:
95
:return: An inventory resulting from the application.
97
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
98
control.create_repository()
99
control.create_branch()
100
tree = self.format.initialize(control)
103
tree._write_inventory(basis)
106
# Fresh object, reads disk again.
107
tree = tree.bzrdir.open_workingtree()
110
tree.apply_inventory_delta(delta)
113
# reload tree - ensure we get what was written.
114
tree = tree.bzrdir.open_workingtree()
116
self.addCleanup(tree.unlock)
117
# One could add 'tree._validate' here but that would cause 'early' failues
118
# as far as higher level code is concerned. Possibly adding an
119
# expect_fail parameter to this function and if that is False then do a
121
return tree.inventory
124
def apply_inventory_WT_basis(self, basis, delta):
125
"""Apply delta to basis and return the result.
127
This sets the parent and then calls update_basis_by_delta.
128
It also puts the basis in the repository under both 'basis' and 'result' to
129
allow safety checks made by the WT to succeed, and finally ensures that all
130
items in the delta with a new path are present in the WT before calling
131
update_basis_by_delta.
133
:param basis: An inventory to be used as the basis.
134
:param delta: The inventory delta to apply:
135
:return: An inventory resulting from the application.
137
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
138
control.create_repository()
139
control.create_branch()
140
tree = self.format.initialize(control)
143
repo = tree.branch.repository
144
repo.start_write_group()
146
rev = revision.Revision('basis', timestamp=0, timezone=None,
147
message="", committer="foo@example.com")
148
basis.revision_id = 'basis'
149
create_texts_for_inv(tree.branch.repository, basis)
150
repo.add_revision('basis', rev, basis)
151
# Add a revision for the result, with the basis content -
152
# update_basis_by_delta doesn't check that the delta results in
153
# result, and we want inconsistent deltas to get called on the
154
# tree, or else the code isn't actually checked.
155
rev = revision.Revision('result', timestamp=0, timezone=None,
156
message="", committer="foo@example.com")
157
basis.revision_id = 'result'
158
repo.add_revision('result', rev, basis)
159
repo.commit_write_group()
161
repo.abort_write_group()
163
# Set the basis state as the trees current state
164
tree._write_inventory(basis)
165
# This reads basis from the repo and puts it into the tree's local
166
# cache, if it has one.
167
tree.set_parent_ids(['basis'])
170
for old, new, id, entry in delta:
171
if None in (new, entry):
173
paths[new] = (entry.file_id, entry.kind)
174
parents.add(osutils.dirname(new))
175
parents = osutils.minimum_path_selection(parents)
177
# Put place holders in the tree to permit adding the other entries.
178
for pos, parent in enumerate(parents):
179
if not tree.path2id(parent):
180
# add a synthetic directory in the tree so we can can put the
181
# tree0 entries in place for dirstate.
182
tree.add([parent], ["id%d" % pos], ["directory"])
184
# Many deltas may cause this mini-apply to fail, but we want to see what
185
# the delta application code says, not the prep that we do to deal with
186
# limitations of dirstate's update_basis code.
187
for path, (file_id, kind) in sorted(paths.items()):
189
tree.add([path], [file_id], [kind])
190
except (KeyboardInterrupt, SystemExit):
196
# Fresh lock, reads disk again.
199
tree.update_basis_by_delta('result', delta)
202
# reload tree - ensure we get what was written.
203
tree = tree.bzrdir.open_workingtree()
204
basis_tree = tree.basis_tree()
205
basis_tree.lock_read()
206
self.addCleanup(basis_tree.unlock)
207
# Note, that if the tree does not have a local cache, the trick above of
208
# setting the result as the basis, will come back to bite us. That said,
209
# all the implementations in bzr do have a local cache.
210
return basis_tree.inventory
213
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
214
"""Apply delta to basis and return the result.
216
This inserts basis as a whole inventory and then uses
217
add_inventory_by_delta to add delta.
219
:param basis: An inventory to be used as the basis.
220
:param delta: The inventory delta to apply:
221
:return: An inventory resulting from the application.
223
format = self.format()
224
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
225
repo = format.initialize(control)
228
repo.start_write_group()
230
rev = revision.Revision('basis', timestamp=0, timezone=None,
231
message="", committer="foo@example.com")
232
basis.revision_id = 'basis'
233
create_texts_for_inv(repo, basis)
234
repo.add_revision('basis', rev, basis)
235
repo.commit_write_group()
237
repo.abort_write_group()
243
repo.start_write_group()
245
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
248
repo.abort_write_group()
251
repo.commit_write_group()
254
# Fresh lock, reads disk again.
255
repo = repo.bzrdir.open_repository()
257
self.addCleanup(repo.unlock)
258
return repo.get_inventory('result')
261
class TestInventoryUpdates(TestCase):
263
def test_creation_from_root_id(self):
264
# iff a root id is passed to the constructor, a root directory is made
265
inv = inventory.Inventory(root_id='tree-root')
266
self.assertNotEqual(None, inv.root)
267
self.assertEqual('tree-root', inv.root.file_id)
269
def test_add_path_of_root(self):
270
# if no root id is given at creation time, there is no root directory
271
inv = inventory.Inventory(root_id=None)
272
self.assertIs(None, inv.root)
273
# add a root entry by adding its path
274
ie = inv.add_path("", "directory", "my-root")
275
ie.revision = 'test-rev'
276
self.assertEqual("my-root", ie.file_id)
277
self.assertIs(ie, inv.root)
279
def test_add_path(self):
280
inv = inventory.Inventory(root_id='tree_root')
281
ie = inv.add_path('hello', 'file', 'hello-id')
282
self.assertEqual('hello-id', ie.file_id)
283
self.assertEqual('file', ie.kind)
286
"""Make sure copy() works and creates a deep copy."""
287
inv = inventory.Inventory(root_id='some-tree-root')
288
ie = inv.add_path('hello', 'file', 'hello-id')
290
inv.root.file_id = 'some-new-root'
292
self.assertEqual('some-tree-root', inv2.root.file_id)
293
self.assertEqual('hello', inv2['hello-id'].name)
295
def test_copy_empty(self):
296
"""Make sure an empty inventory can be copied."""
297
inv = inventory.Inventory(root_id=None)
299
self.assertIs(None, inv2.root)
301
def test_copy_copies_root_revision(self):
302
"""Make sure the revision of the root gets copied."""
303
inv = inventory.Inventory(root_id='someroot')
304
inv.root.revision = 'therev'
306
self.assertEquals('someroot', inv2.root.file_id)
307
self.assertEquals('therev', inv2.root.revision)
309
def test_create_tree_reference(self):
310
inv = inventory.Inventory('tree-root-123')
311
inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
312
revision='rev', reference_revision='rev2'))
314
def test_error_encoding(self):
315
inv = inventory.Inventory('tree-root')
316
inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
317
e = self.assertRaises(errors.InconsistentDelta, inv.add,
318
InventoryFile('b-id', u'\u1234', 'tree-root'))
319
self.assertContainsRe(str(e), r'\\u1234')
321
def test_add_recursive(self):
322
parent = InventoryDirectory('src-id', 'src', 'tree-root')
323
child = InventoryFile('hello-id', 'hello.c', 'src-id')
324
parent.children[child.file_id] = child
325
inv = inventory.Inventory('tree-root')
327
self.assertEqual('src/hello.c', inv.id2path('hello-id'))
331
class TestDeltaApplication(TestCaseWithTransport):
333
def get_empty_inventory(self, reference_inv=None):
334
"""Get an empty inventory.
336
Note that tests should not depend on the revision of the root for
337
setting up test conditions, as it has to be flexible to accomodate non
338
rich root repositories.
340
:param reference_inv: If not None, get the revision for the root from
341
this inventory. This is useful for dealing with older repositories
342
that routinely discarded the root entry data. If None, the root's
343
revision is set to 'basis'.
345
inv = inventory.Inventory()
346
if reference_inv is not None:
347
inv.root.revision = reference_inv.root.revision
349
inv.root.revision = 'basis'
352
def test_empty_delta(self):
353
inv = self.get_empty_inventory()
355
inv = self.apply_delta(self, inv, delta)
356
inv2 = self.get_empty_inventory(inv)
357
self.assertEqual([], inv2._make_delta(inv))
359
def test_None_file_id(self):
360
inv = self.get_empty_inventory()
361
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
362
dir1.revision = 'result'
363
delta = [(None, u'dir1', None, dir1)]
364
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
367
def test_unicode_file_id(self):
368
inv = self.get_empty_inventory()
369
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
370
dir1.revision = 'result'
371
delta = [(None, u'dir1', dir1.file_id, dir1)]
372
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
375
def test_repeated_file_id(self):
376
inv = self.get_empty_inventory()
377
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
378
file1.revision = 'result'
381
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
382
file2.revision = 'result'
385
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
386
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
389
def test_repeated_new_path(self):
390
inv = self.get_empty_inventory()
391
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
392
file1.revision = 'result'
395
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
396
file2.revision = 'result'
399
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
400
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
403
def test_repeated_old_path(self):
404
inv = self.get_empty_inventory()
405
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
406
file1.revision = 'result'
409
# We can't *create* a source inventory with the same path, but
410
# a badly generated partial delta might claim the same source twice.
411
# This would be buggy in two ways: the path is repeated in the delta,
412
# And the path for one of the file ids doesn't match the source
413
# location. Alternatively, we could have a repeated fileid, but that
414
# is separately checked for.
415
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
416
file2.revision = 'result'
421
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
422
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
425
def test_mismatched_id_entry_id(self):
426
inv = self.get_empty_inventory()
427
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
428
file1.revision = 'result'
431
delta = [(None, u'path', 'id', file1)]
432
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
435
def test_mismatched_new_path_entry_None(self):
436
inv = self.get_empty_inventory()
437
delta = [(None, u'path', 'id', None)]
438
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
441
def test_mismatched_new_path_None_entry(self):
442
inv = self.get_empty_inventory()
443
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
444
file1.revision = 'result'
447
delta = [(u"path", None, 'id1', file1)]
448
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
451
def test_parent_is_not_directory(self):
452
inv = self.get_empty_inventory()
453
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
454
file1.revision = 'result'
457
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
458
file2.revision = 'result'
462
delta = [(None, u'path/path2', 'id2', file2)]
463
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
466
def test_parent_is_missing(self):
467
inv = self.get_empty_inventory()
468
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
469
file2.revision = 'result'
472
delta = [(None, u'path/path2', 'id2', file2)]
473
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
476
def test_new_parent_path_has_wrong_id(self):
477
inv = self.get_empty_inventory()
478
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
479
parent1.revision = 'result'
480
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
481
parent2.revision = 'result'
482
file1 = inventory.InventoryFile('id', 'path', 'p-2')
483
file1.revision = 'result'
488
# This delta claims that file1 is at dir/path, but actually its at
489
# dir2/path if you follow the inventory parent structure.
490
delta = [(None, u'dir/path', 'id', file1)]
491
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
494
def test_old_parent_path_is_wrong(self):
495
inv = self.get_empty_inventory()
496
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
497
parent1.revision = 'result'
498
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
499
parent2.revision = 'result'
500
file1 = inventory.InventoryFile('id', 'path', 'p-2')
501
file1.revision = 'result'
507
# This delta claims that file1 was at dir/path, but actually it was at
508
# dir2/path if you follow the inventory parent structure.
509
delta = [(u'dir/path', None, 'id', None)]
510
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
513
def test_old_parent_path_is_for_other_id(self):
514
inv = self.get_empty_inventory()
515
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
516
parent1.revision = 'result'
517
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
518
parent2.revision = 'result'
519
file1 = inventory.InventoryFile('id', 'path', 'p-2')
520
file1.revision = 'result'
523
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
524
file2.revision = 'result'
531
# This delta claims that file1 was at dir/path, but actually it was at
532
# dir2/path if you follow the inventory parent structure. At dir/path
533
# is another entry we should not delete.
534
delta = [(u'dir/path', None, 'id', None)]
535
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
538
def test_add_existing_id_new_path(self):
539
inv = self.get_empty_inventory()
540
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
541
parent1.revision = 'result'
542
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
543
parent2.revision = 'result'
545
delta = [(None, u'dir2', 'p-1', parent2)]
546
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
549
def test_add_new_id_existing_path(self):
550
inv = self.get_empty_inventory()
551
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
552
parent1.revision = 'result'
553
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
554
parent2.revision = 'result'
556
delta = [(None, u'dir1', 'p-2', parent2)]
557
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
560
def test_remove_dir_leaving_dangling_child(self):
561
inv = self.get_empty_inventory()
562
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
563
dir1.revision = 'result'
564
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
565
dir2.revision = 'result'
566
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
567
dir3.revision = 'result'
571
delta = [(u'dir1', None, 'p-1', None),
572
(u'dir1/child2', None, 'p-3', None)]
573
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
30
577
class TestInventory(TestCase):
32
def test_is_within(self):
33
from bzrlib.osutils import is_inside_any
35
SRC_FOO_C = os.path.join('src', 'foo.c')
36
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
40
self.assert_(is_inside_any(dirs, fn))
42
for dirs, fn in [(['src'], 'srccontrol'),
43
(['src'], 'srccontrol/foo')]:
44
self.assertFalse(is_inside_any(dirs, fn))
47
"""Test detection of files within selected directories."""
50
for args in [('src', 'directory', 'src-id'),
51
('doc', 'directory', 'doc-id'),
52
('src/hello.c', 'file'),
53
('src/bye.c', 'file', 'bye-id'),
54
('Makefile', 'file')]:
57
self.assertEqual(inv.path2id('src'), 'src-id')
58
self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
60
self.assert_('src-id' in inv)
63
def test_version(self):
64
"""Inventory remembers the text's version."""
66
ie = inv.add_path('foo.txt', 'file')
579
def test_is_root(self):
580
"""Ensure our root-checking code is accurate."""
581
inv = inventory.Inventory('TREE_ROOT')
582
self.assertTrue(inv.is_root('TREE_ROOT'))
583
self.assertFalse(inv.is_root('booga'))
584
inv.root.file_id = 'booga'
585
self.assertFalse(inv.is_root('TREE_ROOT'))
586
self.assertTrue(inv.is_root('booga'))
587
# works properly even if no root is set
589
self.assertFalse(inv.is_root('TREE_ROOT'))
590
self.assertFalse(inv.is_root('booga'))
592
def test_entries_for_empty_inventory(self):
593
"""Test that entries() will not fail for an empty inventory"""
594
inv = Inventory(root_id=None)
595
self.assertEqual([], inv.entries())
70
598
class TestInventoryEntry(TestCase):
132
652
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
133
653
self.failIf(link.has_text())
136
class TestEntryDiffing(TestCaseInTempDir):
139
super(TestEntryDiffing, self).setUp()
140
self.branch = Branch.initialize(u'.')
141
self.wt = self.branch.working_tree()
142
print >> open('file', 'wb'), 'foo'
143
self.branch.working_tree().add(['file'], ['fileid'])
145
os.symlink('target1', 'symlink')
146
self.branch.working_tree().add(['symlink'], ['linkid'])
147
self.wt.commit('message_1', rev_id = '1')
148
print >> open('file', 'wb'), 'bar'
151
os.symlink('target2', 'symlink')
152
self.tree_1 = self.branch.revision_tree('1')
153
self.inv_1 = self.branch.get_inventory('1')
154
self.file_1 = self.inv_1['fileid']
155
self.tree_2 = self.branch.working_tree()
156
self.inv_2 = self.tree_2.read_working_inventory()
157
self.file_2 = self.inv_2['fileid']
159
self.link_1 = self.inv_1['linkid']
160
self.link_2 = self.inv_2['linkid']
162
def test_file_diff_deleted(self):
164
self.file_1.diff(internal_diff,
165
"old_label", self.tree_1,
166
"/dev/null", None, None,
168
self.assertEqual(output.getvalue(), "--- old_label\t\n"
174
def test_file_diff_added(self):
176
self.file_1.diff(internal_diff,
177
"new_label", self.tree_1,
178
"/dev/null", None, None,
179
output, reverse=True)
180
self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
186
def test_file_diff_changed(self):
188
self.file_1.diff(internal_diff,
189
"/dev/null", self.tree_1,
190
"new_label", self.file_2, self.tree_2,
192
self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
199
def test_link_diff_deleted(self):
200
if not has_symlinks():
203
self.link_1.diff(internal_diff,
204
"old_label", self.tree_1,
205
"/dev/null", None, None,
207
self.assertEqual(output.getvalue(),
208
"=== target was 'target1'\n")
210
def test_link_diff_added(self):
211
if not has_symlinks():
214
self.link_1.diff(internal_diff,
215
"new_label", self.tree_1,
216
"/dev/null", None, None,
217
output, reverse=True)
218
self.assertEqual(output.getvalue(),
219
"=== target is 'target1'\n")
221
def test_link_diff_changed(self):
222
if not has_symlinks():
225
self.link_1.diff(internal_diff,
226
"/dev/null", self.tree_1,
227
"new_label", self.link_2, self.tree_2,
229
self.assertEqual(output.getvalue(),
230
"=== target changed 'target1' => 'target2'\n")
233
class TestSnapshot(TestCaseInTempDir):
236
# for full testing we'll need a branch
237
# with a subdir to test parent changes.
238
# and a file, link and dir under that.
239
# but right now I only need one attribute
240
# to change, and then test merge patterns
241
# with fake parent entries.
242
super(TestSnapshot, self).setUp()
243
self.branch = Branch.initialize(u'.')
244
self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
245
self.branch.working_tree().add(['subdir', 'subdir/file'],
249
self.wt = self.branch.working_tree()
250
self.wt.commit('message_1', rev_id = '1')
251
self.tree_1 = self.branch.revision_tree('1')
252
self.inv_1 = self.branch.get_inventory('1')
253
self.file_1 = self.inv_1['fileid']
254
self.work_tree = self.branch.working_tree()
255
self.file_active = self.work_tree.inventory['fileid']
257
def test_snapshot_new_revision(self):
258
# This tests that a simple commit with no parents makes a new
259
# revision value in the inventory entry
260
self.file_active.snapshot('2', 'subdir/file', {}, self.work_tree,
261
self.branch.weave_store,
262
self.branch.get_transaction())
263
# expected outcome - file_1 has a revision id of '2', and we can get
264
# its text of 'file contents' out of the weave.
265
self.assertEqual(self.file_1.revision, '1')
266
self.assertEqual(self.file_active.revision, '2')
267
# this should be a separate test probably, but lets check it once..
268
lines = self.branch.weave_store.get_lines('fileid','2',
269
self.branch.get_transaction())
270
self.assertEqual(lines, ['contents of subdir/file\n'])
272
def test_snapshot_unchanged(self):
273
#This tests that a simple commit does not make a new entry for
274
# an unchanged inventory entry
275
self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
276
self.work_tree, self.branch.weave_store,
277
self.branch.get_transaction())
278
self.assertEqual(self.file_1.revision, '1')
279
self.assertEqual(self.file_active.revision, '1')
280
self.assertRaises(errors.WeaveError,
281
self.branch.weave_store.get_lines, 'fileid', '2',
282
self.branch.get_transaction())
284
def test_snapshot_merge_identical_different_revid(self):
285
# This tests that a commit with two identical parents, one of which has
286
# a different revision id, results in a new revision id in the entry.
287
# 1->other, commit a merge of other against 1, results in 2.
288
other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
289
other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
290
other_ie.revision = '1'
291
other_ie.text_sha1 = self.file_1.text_sha1
292
other_ie.text_size = self.file_1.text_size
293
self.assertEqual(self.file_1, other_ie)
294
other_ie.revision = 'other'
295
self.assertNotEqual(self.file_1, other_ie)
296
self.branch.weave_store.add_identical_text('fileid', '1', 'other', ['1'],
297
self.branch.get_transaction())
298
self.file_active.snapshot('2', 'subdir/file',
299
{'1':self.file_1, 'other':other_ie},
300
self.work_tree, self.branch.weave_store,
301
self.branch.get_transaction())
302
self.assertEqual(self.file_active.revision, '2')
304
def test_snapshot_changed(self):
305
# This tests that a commit with one different parent results in a new
306
# revision id in the entry.
307
self.file_active.name='newname'
308
rename('subdir/file', 'subdir/newname')
309
self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1},
311
self.branch.weave_store,
312
self.branch.get_transaction())
313
# expected outcome - file_1 has a revision id of '2'
314
self.assertEqual(self.file_active.revision, '2')
317
class TestPreviousHeads(TestCaseInTempDir):
320
# we want several inventories, that respectively
321
# give use the following scenarios:
322
# A) fileid not in any inventory (A),
323
# B) fileid present in one inventory (B) and (A,B)
324
# C) fileid present in two inventories, and they
325
# are not mutual descendents (B, C)
326
# D) fileid present in two inventories and one is
327
# a descendent of the other. (B, D)
328
super(TestPreviousHeads, self).setUp()
329
self.build_tree(['file'])
330
self.branch = Branch.initialize(u'.')
331
self.wt = self.branch.working_tree()
332
self.wt.commit('new branch', allow_pointless=True, rev_id='A')
333
self.inv_A = self.branch.get_inventory('A')
334
self.branch.working_tree().add(['file'], ['fileid'])
335
self.wt.commit('add file', rev_id='B')
336
self.inv_B = self.branch.get_inventory('B')
337
self.branch.put_controlfile('revision-history', 'A\n')
338
self.assertEqual(self.branch.revision_history(), ['A'])
339
self.wt.commit('another add of file', rev_id='C')
340
self.inv_C = self.branch.get_inventory('C')
341
self.wt.add_pending_merge('B')
342
self.wt.commit('merge in B', rev_id='D')
343
self.inv_D = self.branch.get_inventory('D')
344
self.file_active = self.wt.inventory['fileid']
345
self.weave = self.branch.weave_store.get_weave('fileid',
346
self.branch.get_transaction())
348
def get_previous_heads(self, inventories):
349
return self.file_active.find_previous_heads(inventories, self.weave)
351
def test_fileid_in_no_inventory(self):
352
self.assertEqual({}, self.get_previous_heads([self.inv_A]))
354
def test_fileid_in_one_inventory(self):
355
self.assertEqual({'B':self.inv_B['fileid']},
356
self.get_previous_heads([self.inv_B]))
357
self.assertEqual({'B':self.inv_B['fileid']},
358
self.get_previous_heads([self.inv_A, self.inv_B]))
359
self.assertEqual({'B':self.inv_B['fileid']},
360
self.get_previous_heads([self.inv_B, self.inv_A]))
362
def test_fileid_in_two_inventories_gives_both_entries(self):
363
self.assertEqual({'B':self.inv_B['fileid'],
364
'C':self.inv_C['fileid']},
365
self.get_previous_heads([self.inv_B, self.inv_C]))
366
self.assertEqual({'B':self.inv_B['fileid'],
367
'C':self.inv_C['fileid']},
368
self.get_previous_heads([self.inv_C, self.inv_B]))
370
def test_fileid_in_two_inventories_already_merged_gives_head(self):
371
self.assertEqual({'D':self.inv_D['fileid']},
372
self.get_previous_heads([self.inv_B, self.inv_D]))
373
self.assertEqual({'D':self.inv_D['fileid']},
374
self.get_previous_heads([self.inv_D, self.inv_B]))
376
# TODO: test two inventories with the same file revision
655
def test_make_entry(self):
656
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
657
inventory.InventoryFile)
658
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
659
inventory.InventoryLink)
660
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
661
inventory.InventoryDirectory)
663
def test_make_entry_non_normalized(self):
664
orig_normalized_filename = osutils.normalized_filename
667
osutils.normalized_filename = osutils._accessible_normalized_filename
668
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
669
self.assertEqual(u'\xe5', entry.name)
670
self.assertIsInstance(entry, inventory.InventoryFile)
672
osutils.normalized_filename = osutils._inaccessible_normalized_filename
673
self.assertRaises(errors.InvalidNormalization,
674
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
676
osutils.normalized_filename = orig_normalized_filename
679
class TestDescribeChanges(TestCase):
681
def test_describe_change(self):
682
# we need to test the following change combinations:
688
# renamed/reparented and modified
689
# change kind (perhaps can't be done yet?)
690
# also, merged in combination with all of these?
691
old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
692
old_a.text_sha1 = '123132'
694
new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
695
new_a.text_sha1 = '123132'
698
self.assertChangeDescription('unchanged', old_a, new_a)
701
new_a.text_sha1 = 'abcabc'
702
self.assertChangeDescription('modified', old_a, new_a)
704
self.assertChangeDescription('added', None, new_a)
705
self.assertChangeDescription('removed', old_a, None)
706
# perhaps a bit questionable but seems like the most reasonable thing...
707
self.assertChangeDescription('unchanged', None, None)
709
# in this case it's both renamed and modified; show a rename and
711
new_a.name = 'newfilename'
712
self.assertChangeDescription('modified and renamed', old_a, new_a)
714
# reparenting is 'renaming'
715
new_a.name = old_a.name
716
new_a.parent_id = 'somedir-id'
717
self.assertChangeDescription('modified and renamed', old_a, new_a)
719
# reset the content values so its not modified
720
new_a.text_size = old_a.text_size
721
new_a.text_sha1 = old_a.text_sha1
722
new_a.name = old_a.name
724
new_a.name = 'newfilename'
725
self.assertChangeDescription('renamed', old_a, new_a)
727
# reparenting is 'renaming'
728
new_a.name = old_a.name
729
new_a.parent_id = 'somedir-id'
730
self.assertChangeDescription('renamed', old_a, new_a)
732
def assertChangeDescription(self, expected_change, old_ie, new_ie):
733
change = InventoryEntry.describe_change(old_ie, new_ie)
734
self.assertEqual(expected_change, change)
737
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
739
def get_chk_bytes(self):
740
factory = groupcompress.make_pack_factory(True, True, 1)
741
trans = self.get_transport('')
742
return factory(trans)
744
def read_bytes(self, chk_bytes, key):
745
stream = chk_bytes.get_record_stream([key], 'unordered', True)
746
return stream.next().get_bytes_as("fulltext")
748
def test_deserialise_gives_CHKInventory(self):
750
inv.revision_id = "revid"
751
inv.root.revision = "rootrev"
752
chk_bytes = self.get_chk_bytes()
753
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
754
bytes = ''.join(chk_inv.to_lines())
755
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
756
self.assertEqual("revid", new_inv.revision_id)
757
self.assertEqual("directory", new_inv.root.kind)
758
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
759
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
760
self.assertEqual(inv.root.name, new_inv.root.name)
761
self.assertEqual("rootrev", new_inv.root.revision)
762
self.assertEqual('plain', new_inv._search_key_name)
764
def test_deserialise_wrong_revid(self):
766
inv.revision_id = "revid"
767
inv.root.revision = "rootrev"
768
chk_bytes = self.get_chk_bytes()
769
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
770
bytes = ''.join(chk_inv.to_lines())
771
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
774
def test_captures_rev_root_byid(self):
776
inv.revision_id = "foo"
777
inv.root.revision = "bar"
778
chk_bytes = self.get_chk_bytes()
779
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
780
lines = chk_inv.to_lines()
783
'revision_id: foo\n',
784
'root_id: TREE_ROOT\n',
785
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
786
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
788
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
789
self.assertEqual('plain', chk_inv._search_key_name)
791
def test_captures_parent_id_basename_index(self):
793
inv.revision_id = "foo"
794
inv.root.revision = "bar"
795
chk_bytes = self.get_chk_bytes()
796
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
797
lines = chk_inv.to_lines()
800
'revision_id: foo\n',
801
'root_id: TREE_ROOT\n',
802
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
803
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
805
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
806
self.assertEqual('plain', chk_inv._search_key_name)
808
def test_captures_search_key_name(self):
810
inv.revision_id = "foo"
811
inv.root.revision = "bar"
812
chk_bytes = self.get_chk_bytes()
813
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
814
search_key_name='hash-16-way')
815
lines = chk_inv.to_lines()
818
'search_key_name: hash-16-way\n',
819
'root_id: TREE_ROOT\n',
820
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
821
'revision_id: foo\n',
822
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
824
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
825
self.assertEqual('hash-16-way', chk_inv._search_key_name)
827
def test_directory_children_on_demand(self):
829
inv.revision_id = "revid"
830
inv.root.revision = "rootrev"
831
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
832
inv["fileid"].revision = "filerev"
833
inv["fileid"].executable = True
834
inv["fileid"].text_sha1 = "ffff"
835
inv["fileid"].text_size = 1
836
chk_bytes = self.get_chk_bytes()
837
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
838
bytes = ''.join(chk_inv.to_lines())
839
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
840
root_entry = new_inv[inv.root.file_id]
841
self.assertEqual(None, root_entry._children)
842
self.assertEqual(['file'], root_entry.children.keys())
843
file_direct = new_inv["fileid"]
844
file_found = root_entry.children['file']
845
self.assertEqual(file_direct.kind, file_found.kind)
846
self.assertEqual(file_direct.file_id, file_found.file_id)
847
self.assertEqual(file_direct.parent_id, file_found.parent_id)
848
self.assertEqual(file_direct.name, file_found.name)
849
self.assertEqual(file_direct.revision, file_found.revision)
850
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
851
self.assertEqual(file_direct.text_size, file_found.text_size)
852
self.assertEqual(file_direct.executable, file_found.executable)
854
def test_from_inventory_maximum_size(self):
855
# from_inventory supports the maximum_size parameter.
857
inv.revision_id = "revid"
858
inv.root.revision = "rootrev"
859
chk_bytes = self.get_chk_bytes()
860
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
861
chk_inv.id_to_entry._ensure_root()
862
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
863
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
864
p_id_basename = chk_inv.parent_id_basename_to_file_id
865
p_id_basename._ensure_root()
866
self.assertEqual(120, p_id_basename._root_node.maximum_size)
867
self.assertEqual(2, p_id_basename._root_node._key_width)
869
def test___iter__(self):
871
inv.revision_id = "revid"
872
inv.root.revision = "rootrev"
873
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
874
inv["fileid"].revision = "filerev"
875
inv["fileid"].executable = True
876
inv["fileid"].text_sha1 = "ffff"
877
inv["fileid"].text_size = 1
878
chk_bytes = self.get_chk_bytes()
879
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
880
bytes = ''.join(chk_inv.to_lines())
881
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
882
fileids = list(new_inv.__iter__())
884
self.assertEqual([inv.root.file_id, "fileid"], fileids)
886
def test__len__(self):
888
inv.revision_id = "revid"
889
inv.root.revision = "rootrev"
890
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
891
inv["fileid"].revision = "filerev"
892
inv["fileid"].executable = True
893
inv["fileid"].text_sha1 = "ffff"
894
inv["fileid"].text_size = 1
895
chk_bytes = self.get_chk_bytes()
896
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
897
self.assertEqual(2, len(chk_inv))
899
def test___getitem__(self):
901
inv.revision_id = "revid"
902
inv.root.revision = "rootrev"
903
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
904
inv["fileid"].revision = "filerev"
905
inv["fileid"].executable = True
906
inv["fileid"].text_sha1 = "ffff"
907
inv["fileid"].text_size = 1
908
chk_bytes = self.get_chk_bytes()
909
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
910
bytes = ''.join(chk_inv.to_lines())
911
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
912
root_entry = new_inv[inv.root.file_id]
913
file_entry = new_inv["fileid"]
914
self.assertEqual("directory", root_entry.kind)
915
self.assertEqual(inv.root.file_id, root_entry.file_id)
916
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
917
self.assertEqual(inv.root.name, root_entry.name)
918
self.assertEqual("rootrev", root_entry.revision)
919
self.assertEqual("file", file_entry.kind)
920
self.assertEqual("fileid", file_entry.file_id)
921
self.assertEqual(inv.root.file_id, file_entry.parent_id)
922
self.assertEqual("file", file_entry.name)
923
self.assertEqual("filerev", file_entry.revision)
924
self.assertEqual("ffff", file_entry.text_sha1)
925
self.assertEqual(1, file_entry.text_size)
926
self.assertEqual(True, file_entry.executable)
927
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
929
def test_has_id_true(self):
931
inv.revision_id = "revid"
932
inv.root.revision = "rootrev"
933
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
934
inv["fileid"].revision = "filerev"
935
inv["fileid"].executable = True
936
inv["fileid"].text_sha1 = "ffff"
937
inv["fileid"].text_size = 1
938
chk_bytes = self.get_chk_bytes()
939
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
940
self.assertTrue(chk_inv.has_id('fileid'))
941
self.assertTrue(chk_inv.has_id(inv.root.file_id))
943
def test_has_id_not(self):
945
inv.revision_id = "revid"
946
inv.root.revision = "rootrev"
947
chk_bytes = self.get_chk_bytes()
948
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
949
self.assertFalse(chk_inv.has_id('fileid'))
951
def test_id2path(self):
953
inv.revision_id = "revid"
954
inv.root.revision = "rootrev"
955
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
956
fileentry = InventoryFile("fileid", "file", "dirid")
959
inv["fileid"].revision = "filerev"
960
inv["fileid"].executable = True
961
inv["fileid"].text_sha1 = "ffff"
962
inv["fileid"].text_size = 1
963
inv["dirid"].revision = "filerev"
964
chk_bytes = self.get_chk_bytes()
965
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
966
bytes = ''.join(chk_inv.to_lines())
967
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
968
self.assertEqual('', new_inv.id2path(inv.root.file_id))
969
self.assertEqual('dir', new_inv.id2path('dirid'))
970
self.assertEqual('dir/file', new_inv.id2path('fileid'))
972
def test_path2id(self):
974
inv.revision_id = "revid"
975
inv.root.revision = "rootrev"
976
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
977
fileentry = InventoryFile("fileid", "file", "dirid")
980
inv["fileid"].revision = "filerev"
981
inv["fileid"].executable = True
982
inv["fileid"].text_sha1 = "ffff"
983
inv["fileid"].text_size = 1
984
inv["dirid"].revision = "filerev"
985
chk_bytes = self.get_chk_bytes()
986
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
987
bytes = ''.join(chk_inv.to_lines())
988
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
989
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
990
self.assertEqual('dirid', new_inv.path2id('dir'))
991
self.assertEqual('fileid', new_inv.path2id('dir/file'))
993
def test_create_by_apply_delta_sets_root(self):
995
inv.revision_id = "revid"
996
chk_bytes = self.get_chk_bytes()
997
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
998
inv.add_path("", "directory", "myrootid", None)
999
inv.revision_id = "expectedid"
1000
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1001
delta = [("", None, base_inv.root.file_id, None),
1002
(None, "", "myrootid", inv.root)]
1003
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1004
self.assertEquals(reference_inv.root, new_inv.root)
1006
def test_create_by_apply_delta_empty_add_child(self):
1008
inv.revision_id = "revid"
1009
inv.root.revision = "rootrev"
1010
chk_bytes = self.get_chk_bytes()
1011
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1012
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1013
a_entry.revision = "filerev"
1014
a_entry.executable = True
1015
a_entry.text_sha1 = "ffff"
1016
a_entry.text_size = 1
1018
inv.revision_id = "expectedid"
1019
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1020
delta = [(None, "A", "A-id", a_entry)]
1021
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1022
# new_inv should be the same as reference_inv.
1023
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1024
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1025
reference_inv.id_to_entry._ensure_root()
1026
new_inv.id_to_entry._ensure_root()
1027
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1028
new_inv.id_to_entry._root_node._key)
1030
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1032
inv.revision_id = "revid"
1033
inv.root.revision = "rootrev"
1034
chk_bytes = self.get_chk_bytes()
1035
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1036
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1037
a_entry.revision = "filerev"
1038
a_entry.executable = True
1039
a_entry.text_sha1 = "ffff"
1040
a_entry.text_size = 1
1042
inv.revision_id = "expectedid"
1043
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1044
delta = [(None, "A", "A-id", a_entry)]
1045
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1046
reference_inv.id_to_entry._ensure_root()
1047
reference_inv.parent_id_basename_to_file_id._ensure_root()
1048
new_inv.id_to_entry._ensure_root()
1049
new_inv.parent_id_basename_to_file_id._ensure_root()
1050
# new_inv should be the same as reference_inv.
1051
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1052
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1053
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1054
new_inv.id_to_entry._root_node._key)
1055
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1056
new_inv.parent_id_basename_to_file_id._root_node._key)
1058
def test_iter_changes(self):
1059
# Low level bootstrapping smoke test; comprehensive generic tests via
1060
# InterTree are coming.
1062
inv.revision_id = "revid"
1063
inv.root.revision = "rootrev"
1064
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1065
inv["fileid"].revision = "filerev"
1066
inv["fileid"].executable = True
1067
inv["fileid"].text_sha1 = "ffff"
1068
inv["fileid"].text_size = 1
1070
inv2.revision_id = "revid2"
1071
inv2.root.revision = "rootrev"
1072
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1073
inv2["fileid"].revision = "filerev2"
1074
inv2["fileid"].executable = False
1075
inv2["fileid"].text_sha1 = "bbbb"
1076
inv2["fileid"].text_size = 2
1077
# get fresh objects.
1078
chk_bytes = self.get_chk_bytes()
1079
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1080
bytes = ''.join(chk_inv.to_lines())
1081
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1082
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1083
bytes = ''.join(chk_inv2.to_lines())
1084
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1085
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1086
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1088
list(inv_1.iter_changes(inv_2)))
1090
def test_parent_id_basename_to_file_id_index_enabled(self):
1092
inv.revision_id = "revid"
1093
inv.root.revision = "rootrev"
1094
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1095
inv["fileid"].revision = "filerev"
1096
inv["fileid"].executable = True
1097
inv["fileid"].text_sha1 = "ffff"
1098
inv["fileid"].text_size = 1
1099
# get fresh objects.
1100
chk_bytes = self.get_chk_bytes()
1101
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1102
bytes = ''.join(tmp_inv.to_lines())
1103
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1104
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1106
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1107
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1109
def test_file_entry_to_bytes(self):
1110
inv = CHKInventory(None)
1111
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1112
ie.executable = True
1113
ie.revision = 'file-rev-id'
1114
ie.text_sha1 = 'abcdefgh'
1116
bytes = inv._entry_to_bytes(ie)
1117
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1118
'file-rev-id\nabcdefgh\n100\nY', bytes)
1119
ie2 = inv._bytes_to_entry(bytes)
1120
self.assertEqual(ie, ie2)
1121
self.assertIsInstance(ie2.name, unicode)
1122
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1123
inv._bytes_to_utf8name_key(bytes))
1125
def test_file2_entry_to_bytes(self):
1126
inv = CHKInventory(None)
1128
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1129
ie.executable = False
1130
ie.revision = 'file-rev-id'
1131
ie.text_sha1 = '123456'
1133
bytes = inv._entry_to_bytes(ie)
1134
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1135
'file-rev-id\n123456\n25\nN', bytes)
1136
ie2 = inv._bytes_to_entry(bytes)
1137
self.assertEqual(ie, ie2)
1138
self.assertIsInstance(ie2.name, unicode)
1139
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1140
inv._bytes_to_utf8name_key(bytes))
1142
def test_dir_entry_to_bytes(self):
1143
inv = CHKInventory(None)
1144
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1145
ie.revision = 'dir-rev-id'
1146
bytes = inv._entry_to_bytes(ie)
1147
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1148
ie2 = inv._bytes_to_entry(bytes)
1149
self.assertEqual(ie, ie2)
1150
self.assertIsInstance(ie2.name, unicode)
1151
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1152
inv._bytes_to_utf8name_key(bytes))
1154
def test_dir2_entry_to_bytes(self):
1155
inv = CHKInventory(None)
1156
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1158
ie.revision = 'dir-rev-id'
1159
bytes = inv._entry_to_bytes(ie)
1160
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1161
'dir-rev-id', bytes)
1162
ie2 = inv._bytes_to_entry(bytes)
1163
self.assertEqual(ie, ie2)
1164
self.assertIsInstance(ie2.name, unicode)
1165
self.assertIs(ie2.parent_id, None)
1166
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1167
inv._bytes_to_utf8name_key(bytes))
1169
def test_symlink_entry_to_bytes(self):
1170
inv = CHKInventory(None)
1171
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1172
ie.revision = 'link-rev-id'
1173
ie.symlink_target = u'target/path'
1174
bytes = inv._entry_to_bytes(ie)
1175
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1176
'link-rev-id\ntarget/path', bytes)
1177
ie2 = inv._bytes_to_entry(bytes)
1178
self.assertEqual(ie, ie2)
1179
self.assertIsInstance(ie2.name, unicode)
1180
self.assertIsInstance(ie2.symlink_target, unicode)
1181
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1182
inv._bytes_to_utf8name_key(bytes))
1184
def test_symlink2_entry_to_bytes(self):
1185
inv = CHKInventory(None)
1186
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1187
ie.revision = 'link-rev-id'
1188
ie.symlink_target = u'target/\u03a9path'
1189
bytes = inv._entry_to_bytes(ie)
1190
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1191
'link-rev-id\ntarget/\xce\xa9path', bytes)
1192
ie2 = inv._bytes_to_entry(bytes)
1193
self.assertEqual(ie, ie2)
1194
self.assertIsInstance(ie2.name, unicode)
1195
self.assertIsInstance(ie2.symlink_target, unicode)
1196
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1197
inv._bytes_to_utf8name_key(bytes))
1199
def test_tree_reference_entry_to_bytes(self):
1200
inv = CHKInventory(None)
1201
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1203
ie.revision = 'tree-rev-id'
1204
ie.reference_revision = 'ref-rev-id'
1205
bytes = inv._entry_to_bytes(ie)
1206
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1207
'tree-rev-id\nref-rev-id', bytes)
1208
ie2 = inv._bytes_to_entry(bytes)
1209
self.assertEqual(ie, ie2)
1210
self.assertIsInstance(ie2.name, unicode)
1211
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1212
inv._bytes_to_utf8name_key(bytes))
1215
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1217
def get_chk_bytes(self):
1218
factory = groupcompress.make_pack_factory(True, True, 1)
1219
trans = self.get_transport('')
1220
return factory(trans)
1222
def make_dir(self, inv, name, parent_id):
1223
inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1225
def make_file(self, inv, name, parent_id, content='content\n'):
1226
ie = inv.make_entry('file', name, parent_id, name + '-id')
1227
ie.text_sha1 = osutils.sha_string(content)
1228
ie.text_size = len(content)
1231
def make_simple_inventory(self):
1232
inv = Inventory('TREE_ROOT')
1233
inv.revision_id = "revid"
1234
inv.root.revision = "rootrev"
1237
# sub-file1 sub-file1-id
1238
# sub-file2 sub-file2-id
1239
# sub-dir1/ sub-dir1-id
1240
# subsub-file1 subsub-file1-id
1242
# sub2-file1 sub2-file1-id
1244
self.make_dir(inv, 'dir1', 'TREE_ROOT')
1245
self.make_dir(inv, 'dir2', 'TREE_ROOT')
1246
self.make_dir(inv, 'sub-dir1', 'dir1-id')
1247
self.make_file(inv, 'top', 'TREE_ROOT')
1248
self.make_file(inv, 'sub-file1', 'dir1-id')
1249
self.make_file(inv, 'sub-file2', 'dir1-id')
1250
self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1251
self.make_file(inv, 'sub2-file1', 'dir2-id')
1252
chk_bytes = self.get_chk_bytes()
1253
# use a small maximum_size to force internal paging structures
1254
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1256
search_key_name='hash-255-way')
1257
bytes = ''.join(chk_inv.to_lines())
1258
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1260
def assert_Getitems(self, expected_fileids, inv, file_ids):
1261
self.assertEqual(sorted(expected_fileids),
1262
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1264
def assertExpand(self, all_ids, inv, file_ids):
1266
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1267
self.assertEqual(set(all_ids), val_all_ids)
1268
entries = inv._getitems(val_all_ids)
1269
expected_children = {}
1270
for entry in entries:
1271
s = expected_children.setdefault(entry.parent_id, [])
1272
s.append(entry.file_id)
1273
val_children = dict((k, sorted(v)) for k, v
1274
in val_children.iteritems())
1275
expected_children = dict((k, sorted(v)) for k, v
1276
in expected_children.iteritems())
1277
self.assertEqual(expected_children, val_children)
1279
def test_make_simple_inventory(self):
1280
inv = self.make_simple_inventory()
1282
for path, entry in inv.iter_entries_by_dir():
1283
layout.append((path, entry.file_id))
1286
('dir1', 'dir1-id'),
1287
('dir2', 'dir2-id'),
1289
('dir1/sub-dir1', 'sub-dir1-id'),
1290
('dir1/sub-file1', 'sub-file1-id'),
1291
('dir1/sub-file2', 'sub-file2-id'),
1292
('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1293
('dir2/sub2-file1', 'sub2-file1-id'),
1296
def test__getitems(self):
1297
inv = self.make_simple_inventory()
1299
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1300
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1301
self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1303
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1305
self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1306
['dir1-id', 'sub-file2-id'])
1307
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1308
self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1310
def test_single_file(self):
1311
inv = self.make_simple_inventory()
1312
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1314
def test_get_all_parents(self):
1315
inv = self.make_simple_inventory()
1316
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1318
], inv, ['subsub-file1-id'])
1320
def test_get_children(self):
1321
inv = self.make_simple_inventory()
1322
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1323
'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1324
], inv, ['dir1-id'])
1326
def test_from_root(self):
1327
inv = self.make_simple_inventory()
1328
self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1329
'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1330
'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1332
def test_top_level_file(self):
1333
inv = self.make_simple_inventory()
1334
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1336
def test_subsub_file(self):
1337
inv = self.make_simple_inventory()
1338
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1339
'subsub-file1-id'], inv, ['subsub-file1-id'])
1341
def test_sub_and_root(self):
1342
inv = self.make_simple_inventory()
1343
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1344
'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])