1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
27
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
28
InventoryDirectory, InventoryEntry, TreeReference)
29
from bzrlib.tests import (
31
TestCaseWithTransport,
34
split_suite_by_condition,
36
from bzrlib.tests.per_workingtree import workingtree_formats
39
def load_tests(standard_tests, module, loader):
40
"""Parameterise some inventory tests."""
41
to_adapt, result = split_suite_by_condition(standard_tests,
42
condition_isinstance(TestDeltaApplication))
44
('Inventory', {'apply_delta':apply_inventory_Inventory}),
46
# Working tree basis delta application
47
# Repository add_inv_by_delta.
48
# Reduce form of the per_repository test logic - that logic needs to be
49
# be able to get /just/ repositories whereas these tests are fine with
50
# just creating trees.
52
for _, format in repository.format_registry.iteritems():
53
scenarios.append((str(format.__name__), {
54
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
56
for format in workingtree_formats():
58
(str(format.__class__.__name__) + ".update_basis_by_delta", {
59
'apply_delta':apply_inventory_WT_basis,
62
(str(format.__class__.__name__) + ".apply_inventory_delta", {
63
'apply_delta':apply_inventory_WT,
65
return multiply_tests(to_adapt, scenarios, result)
68
def apply_inventory_Inventory(self, basis, delta):
69
"""Apply delta to basis and return the result.
71
:param basis: An inventory to be used as the basis.
72
:param delta: The inventory delta to apply:
73
:return: An inventory resulting from the application.
75
basis.apply_delta(delta)
79
def apply_inventory_WT(self, basis, delta):
80
"""Apply delta to basis and return the result.
82
This sets the tree state to be basis, and then calls apply_inventory_delta.
84
:param basis: An inventory to be used as the basis.
85
:param delta: The inventory delta to apply:
86
:return: An inventory resulting from the application.
88
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
89
control.create_repository()
90
control.create_branch()
91
tree = self.format.initialize(control)
94
tree._write_inventory(basis)
97
# Fresh object, reads disk again.
98
tree = tree.bzrdir.open_workingtree()
101
tree.apply_inventory_delta(delta)
104
# reload tree - ensure we get what was written.
105
tree = tree.bzrdir.open_workingtree()
107
self.addCleanup(tree.unlock)
108
# One could add 'tree._validate' here but that would cause 'early' failues
109
# as far as higher level code is concerned. Possibly adding an
110
# expect_fail parameter to this function and if that is False then do a
112
return tree.inventory
115
def apply_inventory_WT_basis(self, basis, delta):
116
"""Apply delta to basis and return the result.
118
This sets the parent and then calls update_basis_by_delta.
119
It also puts the basis in the repository under both 'basis' and 'result' to
120
allow safety checks made by the WT to succeed, and finally ensures that all
121
items in the delta with a new path are present in the WT before calling
122
update_basis_by_delta.
124
:param basis: An inventory to be used as the basis.
125
:param delta: The inventory delta to apply:
126
:return: An inventory resulting from the application.
128
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
129
control.create_repository()
130
control.create_branch()
131
tree = self.format.initialize(control)
134
repo = tree.branch.repository
135
repo.start_write_group()
137
rev = revision.Revision('basis', timestamp=0, timezone=None,
138
message="", committer="foo@example.com")
139
basis.revision_id = 'basis'
140
repo.add_revision('basis', rev, basis)
141
# Add a revision for the result, with the basis content -
142
# update_basis_by_delta doesn't check that the delta results in
143
# result, and we want inconsistent deltas to get called on the
144
# tree, or else the code isn't actually checked.
145
rev = revision.Revision('result', timestamp=0, timezone=None,
146
message="", committer="foo@example.com")
147
basis.revision_id = 'result'
148
repo.add_revision('result', rev, basis)
150
repo.abort_write_group()
153
repo.commit_write_group()
154
# Set the basis state as the trees current state
155
tree._write_inventory(basis)
156
# This reads basis from the repo and puts it into the tree's local
157
# cache, if it has one.
158
tree.set_parent_ids(['basis'])
161
for old, new, id, entry in delta:
162
if None in (new, entry):
164
paths[new] = (entry.file_id, entry.kind)
165
parents.add(osutils.dirname(new))
166
parents = osutils.minimum_path_selection(parents)
168
# Put place holders in the tree to permit adding the other entries.
169
for pos, parent in enumerate(parents):
170
if not tree.path2id(parent):
171
# add a synthetic directory in the tree so we can can put the
172
# tree0 entries in place for dirstate.
173
tree.add([parent], ["id%d" % pos], ["directory"])
175
# Many deltas may cause this mini-apply to fail, but we want to see what
176
# the delta application code says, not the prep that we do to deal with
177
# limitations of dirstate's update_basis code.
178
for path, (file_id, kind) in sorted(paths.items()):
180
tree.add([path], [file_id], [kind])
181
except (KeyboardInterrupt, SystemExit):
187
# Fresh lock, reads disk again.
190
tree.update_basis_by_delta('result', delta)
193
# reload tree - ensure we get what was written.
194
tree = tree.bzrdir.open_workingtree()
195
basis_tree = tree.basis_tree()
196
basis_tree.lock_read()
197
self.addCleanup(basis_tree.unlock)
198
# Note, that if the tree does not have a local cache, the trick above of
199
# setting the result as the basis, will come back to bite us. That said,
200
# all the implementations in bzr do have a local cache.
201
return basis_tree.inventory
204
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
205
"""Apply delta to basis and return the result.
207
This inserts basis as a whole inventory and then uses
208
add_inventory_by_delta to add delta.
210
:param basis: An inventory to be used as the basis.
211
:param delta: The inventory delta to apply:
212
:return: An inventory resulting from the application.
214
format = self.format()
215
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
216
repo = format.initialize(control)
219
repo.start_write_group()
221
rev = revision.Revision('basis', timestamp=0, timezone=None,
222
message="", committer="foo@example.com")
223
basis.revision_id = 'basis'
224
repo.add_revision('basis', rev, basis)
226
repo.abort_write_group()
229
repo.commit_write_group()
234
repo.start_write_group()
236
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
239
repo.abort_write_group()
242
repo.commit_write_group()
245
# Fresh lock, reads disk again.
246
repo = repo.bzrdir.open_repository()
248
self.addCleanup(repo.unlock)
249
return repo.get_inventory('result')
252
class TestDeltaApplication(TestCaseWithTransport):
254
def get_empty_inventory(self, reference_inv=None):
255
"""Get an empty inventory.
257
Note that tests should not depend on the revision of the root for
258
setting up test conditions, as it has to be flexible to accomodate non
259
rich root repositories.
261
:param reference_inv: If not None, get the revision for the root from
262
this inventory. This is useful for dealing with older repositories
263
that routinely discarded the root entry data. If None, the root's
264
revision is set to 'basis'.
266
inv = inventory.Inventory()
267
if reference_inv is not None:
268
inv.root.revision = reference_inv.root.revision
270
inv.root.revision = 'basis'
273
def test_empty_delta(self):
274
inv = self.get_empty_inventory()
276
inv = self.apply_delta(self, inv, delta)
277
inv2 = self.get_empty_inventory(inv)
278
self.assertEqual([], inv2._make_delta(inv))
280
def test_None_file_id(self):
281
inv = self.get_empty_inventory()
282
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
283
dir1.revision = 'result'
284
delta = [(None, u'dir1', None, dir1)]
285
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
288
def test_unicode_file_id(self):
289
inv = self.get_empty_inventory()
290
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
291
dir1.revision = 'result'
292
delta = [(None, u'dir1', dir1.file_id, dir1)]
293
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
296
def test_repeated_file_id(self):
297
inv = self.get_empty_inventory()
298
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
299
file1.revision = 'result'
302
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
303
file2.revision = 'result'
306
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
307
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
310
def test_repeated_new_path(self):
311
inv = self.get_empty_inventory()
312
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
313
file1.revision = 'result'
316
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
317
file2.revision = 'result'
320
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
321
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
324
def test_repeated_old_path(self):
325
inv = self.get_empty_inventory()
326
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
327
file1.revision = 'result'
330
# We can't *create* a source inventory with the same path, but
331
# a badly generated partial delta might claim the same source twice.
332
# This would be buggy in two ways: the path is repeated in the delta,
333
# And the path for one of the file ids doesn't match the source
334
# location. Alternatively, we could have a repeated fileid, but that
335
# is separately checked for.
336
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
337
file2.revision = 'result'
342
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
343
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
346
def test_mismatched_id_entry_id(self):
347
inv = self.get_empty_inventory()
348
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
349
file1.revision = 'result'
352
delta = [(None, u'path', 'id', file1)]
353
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
356
def test_mismatched_new_path_entry_None(self):
357
inv = self.get_empty_inventory()
358
delta = [(None, u'path', 'id', None)]
359
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
362
def test_mismatched_new_path_None_entry(self):
363
inv = self.get_empty_inventory()
364
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
365
file1.revision = 'result'
368
delta = [(u"path", None, 'id1', file1)]
369
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
372
def test_parent_is_not_directory(self):
373
inv = self.get_empty_inventory()
374
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
375
file1.revision = 'result'
378
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
379
file2.revision = 'result'
383
delta = [(None, u'path/path2', 'id2', file2)]
384
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
387
def test_parent_is_missing(self):
388
inv = self.get_empty_inventory()
389
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
390
file2.revision = 'result'
393
delta = [(None, u'path/path2', 'id2', file2)]
394
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
397
def test_new_parent_path_has_wrong_id(self):
398
inv = self.get_empty_inventory()
399
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
400
parent1.revision = 'result'
401
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
402
parent2.revision = 'result'
403
file1 = inventory.InventoryFile('id', 'path', 'p-2')
404
file1.revision = 'result'
409
# This delta claims that file1 is at dir/path, but actually its at
410
# dir2/path if you follow the inventory parent structure.
411
delta = [(None, u'dir/path', 'id', file1)]
412
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
415
def test_old_parent_path_is_wrong(self):
416
inv = self.get_empty_inventory()
417
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
418
parent1.revision = 'result'
419
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
420
parent2.revision = 'result'
421
file1 = inventory.InventoryFile('id', 'path', 'p-2')
422
file1.revision = 'result'
428
# This delta claims that file1 was at dir/path, but actually it was at
429
# dir2/path if you follow the inventory parent structure.
430
delta = [(u'dir/path', None, 'id', None)]
431
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
434
def test_old_parent_path_is_for_other_id(self):
435
inv = self.get_empty_inventory()
436
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
437
parent1.revision = 'result'
438
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
439
parent2.revision = 'result'
440
file1 = inventory.InventoryFile('id', 'path', 'p-2')
441
file1.revision = 'result'
444
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
445
file2.revision = 'result'
452
# This delta claims that file1 was at dir/path, but actually it was at
453
# dir2/path if you follow the inventory parent structure. At dir/path
454
# is another entry we should not delete.
455
delta = [(u'dir/path', None, 'id', None)]
456
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
459
def test_add_existing_id_new_path(self):
460
inv = self.get_empty_inventory()
461
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
462
parent1.revision = 'result'
463
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
464
parent2.revision = 'result'
466
delta = [(None, u'dir2', 'p-1', parent2)]
467
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
470
def test_add_new_id_existing_path(self):
471
inv = self.get_empty_inventory()
472
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
473
parent1.revision = 'result'
474
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
475
parent2.revision = 'result'
477
delta = [(None, u'dir1', 'p-2', parent2)]
478
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
481
def test_remove_dir_leaving_dangling_child(self):
482
inv = self.get_empty_inventory()
483
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
484
dir1.revision = 'result'
485
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
486
dir2.revision = 'result'
487
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
488
dir3.revision = 'result'
492
delta = [(u'dir1', None, 'p-1', None),
493
(u'dir1/child2', None, 'p-3', None)]
494
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
498
class TestInventoryEntry(TestCase):
500
def test_file_kind_character(self):
501
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
502
self.assertEqual(file.kind_character(), '')
504
def test_dir_kind_character(self):
505
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
506
self.assertEqual(dir.kind_character(), '/')
508
def test_link_kind_character(self):
509
dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
510
self.assertEqual(dir.kind_character(), '')
512
def test_dir_detect_changes(self):
513
left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
515
left.executable = True
516
left.symlink_target='foo'
517
right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
518
right.text_sha1 = 321
519
right.symlink_target='bar'
520
self.assertEqual((False, False), left.detect_changes(right))
521
self.assertEqual((False, False), right.detect_changes(left))
523
def test_file_detect_changes(self):
524
left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
526
right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
527
right.text_sha1 = 123
528
self.assertEqual((False, False), left.detect_changes(right))
529
self.assertEqual((False, False), right.detect_changes(left))
530
left.executable = True
531
self.assertEqual((False, True), left.detect_changes(right))
532
self.assertEqual((False, True), right.detect_changes(left))
533
right.text_sha1 = 321
534
self.assertEqual((True, True), left.detect_changes(right))
535
self.assertEqual((True, True), right.detect_changes(left))
537
def test_symlink_detect_changes(self):
538
left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
540
left.executable = True
541
left.symlink_target='foo'
542
right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
543
right.text_sha1 = 321
544
right.symlink_target='foo'
545
self.assertEqual((False, False), left.detect_changes(right))
546
self.assertEqual((False, False), right.detect_changes(left))
547
left.symlink_target = 'different'
548
self.assertEqual((True, False), left.detect_changes(right))
549
self.assertEqual((True, False), right.detect_changes(left))
551
def test_file_has_text(self):
552
file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
553
self.failUnless(file.has_text())
555
def test_directory_has_text(self):
556
dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
557
self.failIf(dir.has_text())
559
def test_link_has_text(self):
560
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
561
self.failIf(link.has_text())
563
def test_make_entry(self):
564
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
565
inventory.InventoryFile)
566
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
567
inventory.InventoryLink)
568
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
569
inventory.InventoryDirectory)
571
def test_make_entry_non_normalized(self):
572
orig_normalized_filename = osutils.normalized_filename
575
osutils.normalized_filename = osutils._accessible_normalized_filename
576
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
577
self.assertEqual(u'\xe5', entry.name)
578
self.assertIsInstance(entry, inventory.InventoryFile)
580
osutils.normalized_filename = osutils._inaccessible_normalized_filename
581
self.assertRaises(errors.InvalidNormalization,
582
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
584
osutils.normalized_filename = orig_normalized_filename
587
class TestDescribeChanges(TestCase):
589
def test_describe_change(self):
590
# we need to test the following change combinations:
596
# renamed/reparented and modified
597
# change kind (perhaps can't be done yet?)
598
# also, merged in combination with all of these?
599
old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
600
old_a.text_sha1 = '123132'
602
new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
603
new_a.text_sha1 = '123132'
606
self.assertChangeDescription('unchanged', old_a, new_a)
609
new_a.text_sha1 = 'abcabc'
610
self.assertChangeDescription('modified', old_a, new_a)
612
self.assertChangeDescription('added', None, new_a)
613
self.assertChangeDescription('removed', old_a, None)
614
# perhaps a bit questionable but seems like the most reasonable thing...
615
self.assertChangeDescription('unchanged', None, None)
617
# in this case it's both renamed and modified; show a rename and
619
new_a.name = 'newfilename'
620
self.assertChangeDescription('modified and renamed', old_a, new_a)
622
# reparenting is 'renaming'
623
new_a.name = old_a.name
624
new_a.parent_id = 'somedir-id'
625
self.assertChangeDescription('modified and renamed', old_a, new_a)
627
# reset the content values so its not modified
628
new_a.text_size = old_a.text_size
629
new_a.text_sha1 = old_a.text_sha1
630
new_a.name = old_a.name
632
new_a.name = 'newfilename'
633
self.assertChangeDescription('renamed', old_a, new_a)
635
# reparenting is 'renaming'
636
new_a.name = old_a.name
637
new_a.parent_id = 'somedir-id'
638
self.assertChangeDescription('renamed', old_a, new_a)
640
def assertChangeDescription(self, expected_change, old_ie, new_ie):
641
change = InventoryEntry.describe_change(old_ie, new_ie)
642
self.assertEqual(expected_change, change)
645
class TestCHKInventory(TestCaseWithTransport):
647
def get_chk_bytes(self):
648
# The easiest way to get a CHK store is a development6 repository and
649
# then work with the chk_bytes attribute directly.
650
repo = self.make_repository(".", format="development6-rich-root")
652
self.addCleanup(repo.unlock)
653
repo.start_write_group()
654
self.addCleanup(repo.abort_write_group)
655
return repo.chk_bytes
657
def read_bytes(self, chk_bytes, key):
658
stream = chk_bytes.get_record_stream([key], 'unordered', True)
659
return stream.next().get_bytes_as("fulltext")
661
def test_deserialise_gives_CHKInventory(self):
663
inv.revision_id = "revid"
664
inv.root.revision = "rootrev"
665
chk_bytes = self.get_chk_bytes()
666
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
667
bytes = ''.join(chk_inv.to_lines())
668
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
669
self.assertEqual("revid", new_inv.revision_id)
670
self.assertEqual("directory", new_inv.root.kind)
671
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
672
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
673
self.assertEqual(inv.root.name, new_inv.root.name)
674
self.assertEqual("rootrev", new_inv.root.revision)
675
self.assertEqual('plain', new_inv._search_key_name)
677
def test_deserialise_wrong_revid(self):
679
inv.revision_id = "revid"
680
inv.root.revision = "rootrev"
681
chk_bytes = self.get_chk_bytes()
682
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
683
bytes = ''.join(chk_inv.to_lines())
684
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
687
def test_captures_rev_root_byid(self):
689
inv.revision_id = "foo"
690
inv.root.revision = "bar"
691
chk_bytes = self.get_chk_bytes()
692
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
693
lines = chk_inv.to_lines()
696
'revision_id: foo\n',
697
'root_id: TREE_ROOT\n',
698
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
699
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
701
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
702
self.assertEqual('plain', chk_inv._search_key_name)
704
def test_captures_parent_id_basename_index(self):
706
inv.revision_id = "foo"
707
inv.root.revision = "bar"
708
chk_bytes = self.get_chk_bytes()
709
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
710
lines = chk_inv.to_lines()
713
'revision_id: foo\n',
714
'root_id: TREE_ROOT\n',
715
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
716
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
718
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
719
self.assertEqual('plain', chk_inv._search_key_name)
721
def test_captures_search_key_name(self):
723
inv.revision_id = "foo"
724
inv.root.revision = "bar"
725
chk_bytes = self.get_chk_bytes()
726
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
727
search_key_name='hash-16-way')
728
lines = chk_inv.to_lines()
731
'search_key_name: hash-16-way\n',
732
'root_id: TREE_ROOT\n',
733
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
734
'revision_id: foo\n',
735
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
737
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
738
self.assertEqual('hash-16-way', chk_inv._search_key_name)
740
def test_directory_children_on_demand(self):
742
inv.revision_id = "revid"
743
inv.root.revision = "rootrev"
744
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
745
inv["fileid"].revision = "filerev"
746
inv["fileid"].executable = True
747
inv["fileid"].text_sha1 = "ffff"
748
inv["fileid"].text_size = 1
749
chk_bytes = self.get_chk_bytes()
750
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
751
bytes = ''.join(chk_inv.to_lines())
752
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
753
root_entry = new_inv[inv.root.file_id]
754
self.assertEqual(None, root_entry._children)
755
self.assertEqual(['file'], root_entry.children.keys())
756
file_direct = new_inv["fileid"]
757
file_found = root_entry.children['file']
758
self.assertEqual(file_direct.kind, file_found.kind)
759
self.assertEqual(file_direct.file_id, file_found.file_id)
760
self.assertEqual(file_direct.parent_id, file_found.parent_id)
761
self.assertEqual(file_direct.name, file_found.name)
762
self.assertEqual(file_direct.revision, file_found.revision)
763
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
764
self.assertEqual(file_direct.text_size, file_found.text_size)
765
self.assertEqual(file_direct.executable, file_found.executable)
767
def test_from_inventory_maximum_size(self):
768
# from_inventory supports the maximum_size parameter.
770
inv.revision_id = "revid"
771
inv.root.revision = "rootrev"
772
chk_bytes = self.get_chk_bytes()
773
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
774
chk_inv.id_to_entry._ensure_root()
775
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
776
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
777
p_id_basename = chk_inv.parent_id_basename_to_file_id
778
p_id_basename._ensure_root()
779
self.assertEqual(120, p_id_basename._root_node.maximum_size)
780
self.assertEqual(2, p_id_basename._root_node._key_width)
782
def test___iter__(self):
784
inv.revision_id = "revid"
785
inv.root.revision = "rootrev"
786
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
787
inv["fileid"].revision = "filerev"
788
inv["fileid"].executable = True
789
inv["fileid"].text_sha1 = "ffff"
790
inv["fileid"].text_size = 1
791
chk_bytes = self.get_chk_bytes()
792
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
793
bytes = ''.join(chk_inv.to_lines())
794
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
795
fileids = list(new_inv.__iter__())
797
self.assertEqual([inv.root.file_id, "fileid"], fileids)
799
def test__len__(self):
801
inv.revision_id = "revid"
802
inv.root.revision = "rootrev"
803
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
804
inv["fileid"].revision = "filerev"
805
inv["fileid"].executable = True
806
inv["fileid"].text_sha1 = "ffff"
807
inv["fileid"].text_size = 1
808
chk_bytes = self.get_chk_bytes()
809
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
810
self.assertEqual(2, len(chk_inv))
812
def test___getitem__(self):
814
inv.revision_id = "revid"
815
inv.root.revision = "rootrev"
816
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
817
inv["fileid"].revision = "filerev"
818
inv["fileid"].executable = True
819
inv["fileid"].text_sha1 = "ffff"
820
inv["fileid"].text_size = 1
821
chk_bytes = self.get_chk_bytes()
822
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
823
bytes = ''.join(chk_inv.to_lines())
824
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
825
root_entry = new_inv[inv.root.file_id]
826
file_entry = new_inv["fileid"]
827
self.assertEqual("directory", root_entry.kind)
828
self.assertEqual(inv.root.file_id, root_entry.file_id)
829
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
830
self.assertEqual(inv.root.name, root_entry.name)
831
self.assertEqual("rootrev", root_entry.revision)
832
self.assertEqual("file", file_entry.kind)
833
self.assertEqual("fileid", file_entry.file_id)
834
self.assertEqual(inv.root.file_id, file_entry.parent_id)
835
self.assertEqual("file", file_entry.name)
836
self.assertEqual("filerev", file_entry.revision)
837
self.assertEqual("ffff", file_entry.text_sha1)
838
self.assertEqual(1, file_entry.text_size)
839
self.assertEqual(True, file_entry.executable)
840
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
842
def test_has_id_true(self):
844
inv.revision_id = "revid"
845
inv.root.revision = "rootrev"
846
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
847
inv["fileid"].revision = "filerev"
848
inv["fileid"].executable = True
849
inv["fileid"].text_sha1 = "ffff"
850
inv["fileid"].text_size = 1
851
chk_bytes = self.get_chk_bytes()
852
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
853
self.assertTrue(chk_inv.has_id('fileid'))
854
self.assertTrue(chk_inv.has_id(inv.root.file_id))
856
def test_has_id_not(self):
858
inv.revision_id = "revid"
859
inv.root.revision = "rootrev"
860
chk_bytes = self.get_chk_bytes()
861
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
862
self.assertFalse(chk_inv.has_id('fileid'))
864
def test_id2path(self):
866
inv.revision_id = "revid"
867
inv.root.revision = "rootrev"
868
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
869
fileentry = InventoryFile("fileid", "file", "dirid")
872
inv["fileid"].revision = "filerev"
873
inv["fileid"].executable = True
874
inv["fileid"].text_sha1 = "ffff"
875
inv["fileid"].text_size = 1
876
inv["dirid"].revision = "filerev"
877
chk_bytes = self.get_chk_bytes()
878
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
879
bytes = ''.join(chk_inv.to_lines())
880
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
881
self.assertEqual('', new_inv.id2path(inv.root.file_id))
882
self.assertEqual('dir', new_inv.id2path('dirid'))
883
self.assertEqual('dir/file', new_inv.id2path('fileid'))
885
def test_path2id(self):
887
inv.revision_id = "revid"
888
inv.root.revision = "rootrev"
889
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
890
fileentry = InventoryFile("fileid", "file", "dirid")
893
inv["fileid"].revision = "filerev"
894
inv["fileid"].executable = True
895
inv["fileid"].text_sha1 = "ffff"
896
inv["fileid"].text_size = 1
897
inv["dirid"].revision = "filerev"
898
chk_bytes = self.get_chk_bytes()
899
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
900
bytes = ''.join(chk_inv.to_lines())
901
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
902
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
903
self.assertEqual('dirid', new_inv.path2id('dir'))
904
self.assertEqual('fileid', new_inv.path2id('dir/file'))
906
def test_create_by_apply_delta_sets_root(self):
908
inv.revision_id = "revid"
909
chk_bytes = self.get_chk_bytes()
910
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
911
inv.add_path("", "directory", "myrootid", None)
912
inv.revision_id = "expectedid"
913
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
914
delta = [("", None, base_inv.root.file_id, None),
915
(None, "", "myrootid", inv.root)]
916
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
917
self.assertEquals(reference_inv.root, new_inv.root)
919
def test_create_by_apply_delta_empty_add_child(self):
921
inv.revision_id = "revid"
922
inv.root.revision = "rootrev"
923
chk_bytes = self.get_chk_bytes()
924
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
925
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
926
a_entry.revision = "filerev"
927
a_entry.executable = True
928
a_entry.text_sha1 = "ffff"
929
a_entry.text_size = 1
931
inv.revision_id = "expectedid"
932
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
933
delta = [(None, "A", "A-id", a_entry)]
934
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
935
# new_inv should be the same as reference_inv.
936
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
937
self.assertEqual(reference_inv.root_id, new_inv.root_id)
938
reference_inv.id_to_entry._ensure_root()
939
new_inv.id_to_entry._ensure_root()
940
self.assertEqual(reference_inv.id_to_entry._root_node._key,
941
new_inv.id_to_entry._root_node._key)
943
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
945
inv.revision_id = "revid"
946
inv.root.revision = "rootrev"
947
chk_bytes = self.get_chk_bytes()
948
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
949
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
950
a_entry.revision = "filerev"
951
a_entry.executable = True
952
a_entry.text_sha1 = "ffff"
953
a_entry.text_size = 1
955
inv.revision_id = "expectedid"
956
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
957
delta = [(None, "A", "A-id", a_entry)]
958
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
959
reference_inv.id_to_entry._ensure_root()
960
reference_inv.parent_id_basename_to_file_id._ensure_root()
961
new_inv.id_to_entry._ensure_root()
962
new_inv.parent_id_basename_to_file_id._ensure_root()
963
# new_inv should be the same as reference_inv.
964
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
965
self.assertEqual(reference_inv.root_id, new_inv.root_id)
966
self.assertEqual(reference_inv.id_to_entry._root_node._key,
967
new_inv.id_to_entry._root_node._key)
968
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
969
new_inv.parent_id_basename_to_file_id._root_node._key)
971
def test_iter_changes(self):
972
# Low level bootstrapping smoke test; comprehensive generic tests via
973
# InterTree are coming.
975
inv.revision_id = "revid"
976
inv.root.revision = "rootrev"
977
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
978
inv["fileid"].revision = "filerev"
979
inv["fileid"].executable = True
980
inv["fileid"].text_sha1 = "ffff"
981
inv["fileid"].text_size = 1
983
inv2.revision_id = "revid2"
984
inv2.root.revision = "rootrev"
985
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
986
inv2["fileid"].revision = "filerev2"
987
inv2["fileid"].executable = False
988
inv2["fileid"].text_sha1 = "bbbb"
989
inv2["fileid"].text_size = 2
991
chk_bytes = self.get_chk_bytes()
992
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
993
bytes = ''.join(chk_inv.to_lines())
994
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
995
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
996
bytes = ''.join(chk_inv2.to_lines())
997
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
998
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
999
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1001
list(inv_1.iter_changes(inv_2)))
1003
def test_parent_id_basename_to_file_id_index_enabled(self):
1005
inv.revision_id = "revid"
1006
inv.root.revision = "rootrev"
1007
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1008
inv["fileid"].revision = "filerev"
1009
inv["fileid"].executable = True
1010
inv["fileid"].text_sha1 = "ffff"
1011
inv["fileid"].text_size = 1
1012
# get fresh objects.
1013
chk_bytes = self.get_chk_bytes()
1014
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1015
bytes = ''.join(tmp_inv.to_lines())
1016
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1017
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1019
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1020
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1022
def test_file_entry_to_bytes(self):
1023
inv = CHKInventory(None)
1024
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1025
ie.executable = True
1026
ie.revision = 'file-rev-id'
1027
ie.text_sha1 = 'abcdefgh'
1029
bytes = inv._entry_to_bytes(ie)
1030
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1031
'file-rev-id\nabcdefgh\n100\nY', bytes)
1032
ie2 = inv._bytes_to_entry(bytes)
1033
self.assertEqual(ie, ie2)
1034
self.assertIsInstance(ie2.name, unicode)
1035
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1036
inv._bytes_to_utf8name_key(bytes))
1038
def test_file2_entry_to_bytes(self):
1039
inv = CHKInventory(None)
1041
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1042
ie.executable = False
1043
ie.revision = 'file-rev-id'
1044
ie.text_sha1 = '123456'
1046
bytes = inv._entry_to_bytes(ie)
1047
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1048
'file-rev-id\n123456\n25\nN', bytes)
1049
ie2 = inv._bytes_to_entry(bytes)
1050
self.assertEqual(ie, ie2)
1051
self.assertIsInstance(ie2.name, unicode)
1052
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1053
inv._bytes_to_utf8name_key(bytes))
1055
def test_dir_entry_to_bytes(self):
1056
inv = CHKInventory(None)
1057
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1058
ie.revision = 'dir-rev-id'
1059
bytes = inv._entry_to_bytes(ie)
1060
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1061
ie2 = inv._bytes_to_entry(bytes)
1062
self.assertEqual(ie, ie2)
1063
self.assertIsInstance(ie2.name, unicode)
1064
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1065
inv._bytes_to_utf8name_key(bytes))
1067
def test_dir2_entry_to_bytes(self):
1068
inv = CHKInventory(None)
1069
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1071
ie.revision = 'dir-rev-id'
1072
bytes = inv._entry_to_bytes(ie)
1073
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1074
'dir-rev-id', bytes)
1075
ie2 = inv._bytes_to_entry(bytes)
1076
self.assertEqual(ie, ie2)
1077
self.assertIsInstance(ie2.name, unicode)
1078
self.assertIs(ie2.parent_id, None)
1079
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1080
inv._bytes_to_utf8name_key(bytes))
1082
def test_symlink_entry_to_bytes(self):
1083
inv = CHKInventory(None)
1084
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1085
ie.revision = 'link-rev-id'
1086
ie.symlink_target = u'target/path'
1087
bytes = inv._entry_to_bytes(ie)
1088
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1089
'link-rev-id\ntarget/path', bytes)
1090
ie2 = inv._bytes_to_entry(bytes)
1091
self.assertEqual(ie, ie2)
1092
self.assertIsInstance(ie2.name, unicode)
1093
self.assertIsInstance(ie2.symlink_target, unicode)
1094
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1095
inv._bytes_to_utf8name_key(bytes))
1097
def test_symlink2_entry_to_bytes(self):
1098
inv = CHKInventory(None)
1099
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1100
ie.revision = 'link-rev-id'
1101
ie.symlink_target = u'target/\u03a9path'
1102
bytes = inv._entry_to_bytes(ie)
1103
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1104
'link-rev-id\ntarget/\xce\xa9path', bytes)
1105
ie2 = inv._bytes_to_entry(bytes)
1106
self.assertEqual(ie, ie2)
1107
self.assertIsInstance(ie2.name, unicode)
1108
self.assertIsInstance(ie2.symlink_target, unicode)
1109
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1110
inv._bytes_to_utf8name_key(bytes))
1112
def test_tree_reference_entry_to_bytes(self):
1113
inv = CHKInventory(None)
1114
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1116
ie.revision = 'tree-rev-id'
1117
ie.reference_revision = 'ref-rev-id'
1118
bytes = inv._entry_to_bytes(ie)
1119
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1120
'tree-rev-id\nref-rev-id', bytes)
1121
ie2 = inv._bytes_to_entry(bytes)
1122
self.assertEqual(ie, ie2)
1123
self.assertIsInstance(ie2.name, unicode)
1124
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1125
inv._bytes_to_utf8name_key(bytes))