1
# Copyright (C) 2005-2010 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
28
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
29
InventoryDirectory, InventoryEntry, TreeReference)
30
from bzrlib.tests import (
32
TestCaseWithTransport,
35
split_suite_by_condition,
37
from bzrlib.tests.per_workingtree import workingtree_formats
40
def load_tests(standard_tests, module, loader):
41
"""Parameterise some inventory tests."""
42
to_adapt, result = split_suite_by_condition(standard_tests,
43
condition_isinstance(TestDeltaApplication))
45
('Inventory', {'apply_delta':apply_inventory_Inventory}),
47
# Working tree basis delta application
48
# Repository add_inv_by_delta.
49
# Reduce form of the per_repository test logic - that logic needs to be
50
# be able to get /just/ repositories whereas these tests are fine with
51
# just creating trees.
53
for _, format in repository.format_registry.iteritems():
54
scenarios.append((str(format.__name__), {
55
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
57
for format in workingtree_formats():
59
(str(format.__class__.__name__) + ".update_basis_by_delta", {
60
'apply_delta':apply_inventory_WT_basis,
63
(str(format.__class__.__name__) + ".apply_inventory_delta", {
64
'apply_delta':apply_inventory_WT,
66
return multiply_tests(to_adapt, scenarios, result)
69
def create_texts_for_inv(repo, inv):
70
for path, ie in inv.iter_entries():
72
lines = ['a' * ie.text_size]
75
repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
77
def apply_inventory_Inventory(self, basis, delta):
78
"""Apply delta to basis and return the result.
80
:param basis: An inventory to be used as the basis.
81
:param delta: The inventory delta to apply:
82
:return: An inventory resulting from the application.
84
basis.apply_delta(delta)
88
def apply_inventory_WT(self, basis, delta):
89
"""Apply delta to basis and return the result.
91
This sets the tree state to be basis, and then calls apply_inventory_delta.
93
:param basis: An inventory to be used as the basis.
94
:param delta: The inventory delta to apply:
95
:return: An inventory resulting from the application.
97
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
98
control.create_repository()
99
control.create_branch()
100
tree = self.format.initialize(control)
103
tree._write_inventory(basis)
106
# Fresh object, reads disk again.
107
tree = tree.bzrdir.open_workingtree()
110
tree.apply_inventory_delta(delta)
113
# reload tree - ensure we get what was written.
114
tree = tree.bzrdir.open_workingtree()
116
self.addCleanup(tree.unlock)
117
# One could add 'tree._validate' here but that would cause 'early' failues
118
# as far as higher level code is concerned. Possibly adding an
119
# expect_fail parameter to this function and if that is False then do a
121
return tree.inventory
124
def apply_inventory_WT_basis(self, basis, delta):
125
"""Apply delta to basis and return the result.
127
This sets the parent and then calls update_basis_by_delta.
128
It also puts the basis in the repository under both 'basis' and 'result' to
129
allow safety checks made by the WT to succeed, and finally ensures that all
130
items in the delta with a new path are present in the WT before calling
131
update_basis_by_delta.
133
:param basis: An inventory to be used as the basis.
134
:param delta: The inventory delta to apply:
135
:return: An inventory resulting from the application.
137
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
138
control.create_repository()
139
control.create_branch()
140
tree = self.format.initialize(control)
143
repo = tree.branch.repository
144
repo.start_write_group()
146
rev = revision.Revision('basis', timestamp=0, timezone=None,
147
message="", committer="foo@example.com")
148
basis.revision_id = 'basis'
149
create_texts_for_inv(tree.branch.repository, basis)
150
repo.add_revision('basis', rev, basis)
151
# Add a revision for the result, with the basis content -
152
# update_basis_by_delta doesn't check that the delta results in
153
# result, and we want inconsistent deltas to get called on the
154
# tree, or else the code isn't actually checked.
155
rev = revision.Revision('result', timestamp=0, timezone=None,
156
message="", committer="foo@example.com")
157
basis.revision_id = 'result'
158
repo.add_revision('result', rev, basis)
159
repo.commit_write_group()
161
repo.abort_write_group()
163
# Set the basis state as the trees current state
164
tree._write_inventory(basis)
165
# This reads basis from the repo and puts it into the tree's local
166
# cache, if it has one.
167
tree.set_parent_ids(['basis'])
170
for old, new, id, entry in delta:
171
if None in (new, entry):
173
paths[new] = (entry.file_id, entry.kind)
174
parents.add(osutils.dirname(new))
175
parents = osutils.minimum_path_selection(parents)
177
# Put place holders in the tree to permit adding the other entries.
178
for pos, parent in enumerate(parents):
179
if not tree.path2id(parent):
180
# add a synthetic directory in the tree so we can can put the
181
# tree0 entries in place for dirstate.
182
tree.add([parent], ["id%d" % pos], ["directory"])
184
# Many deltas may cause this mini-apply to fail, but we want to see what
185
# the delta application code says, not the prep that we do to deal with
186
# limitations of dirstate's update_basis code.
187
for path, (file_id, kind) in sorted(paths.items()):
189
tree.add([path], [file_id], [kind])
190
except (KeyboardInterrupt, SystemExit):
196
# Fresh lock, reads disk again.
199
tree.update_basis_by_delta('result', delta)
202
# reload tree - ensure we get what was written.
203
tree = tree.bzrdir.open_workingtree()
204
basis_tree = tree.basis_tree()
205
basis_tree.lock_read()
206
self.addCleanup(basis_tree.unlock)
207
# Note, that if the tree does not have a local cache, the trick above of
208
# setting the result as the basis, will come back to bite us. That said,
209
# all the implementations in bzr do have a local cache.
210
return basis_tree.inventory
213
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
214
"""Apply delta to basis and return the result.
216
This inserts basis as a whole inventory and then uses
217
add_inventory_by_delta to add delta.
219
:param basis: An inventory to be used as the basis.
220
:param delta: The inventory delta to apply:
221
:return: An inventory resulting from the application.
223
format = self.format()
224
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
225
repo = format.initialize(control)
228
repo.start_write_group()
230
rev = revision.Revision('basis', timestamp=0, timezone=None,
231
message="", committer="foo@example.com")
232
basis.revision_id = 'basis'
233
create_texts_for_inv(repo, basis)
234
repo.add_revision('basis', rev, basis)
235
repo.commit_write_group()
237
repo.abort_write_group()
243
repo.start_write_group()
245
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
248
repo.abort_write_group()
251
repo.commit_write_group()
254
# Fresh lock, reads disk again.
255
repo = repo.bzrdir.open_repository()
257
self.addCleanup(repo.unlock)
258
return repo.get_inventory('result')
261
class TestInventoryUpdates(TestCase):
263
def test_creation_from_root_id(self):
264
# iff a root id is passed to the constructor, a root directory is made
265
inv = inventory.Inventory(root_id='tree-root')
266
self.assertNotEqual(None, inv.root)
267
self.assertEqual('tree-root', inv.root.file_id)
269
def test_add_path_of_root(self):
270
# if no root id is given at creation time, there is no root directory
271
inv = inventory.Inventory(root_id=None)
272
self.assertIs(None, inv.root)
273
# add a root entry by adding its path
274
ie = inv.add_path("", "directory", "my-root")
275
ie.revision = 'test-rev'
276
self.assertEqual("my-root", ie.file_id)
277
self.assertIs(ie, inv.root)
279
def test_add_path(self):
280
inv = inventory.Inventory(root_id='tree_root')
281
ie = inv.add_path('hello', 'file', 'hello-id')
282
self.assertEqual('hello-id', ie.file_id)
283
self.assertEqual('file', ie.kind)
286
"""Make sure copy() works and creates a deep copy."""
287
inv = inventory.Inventory(root_id='some-tree-root')
288
ie = inv.add_path('hello', 'file', 'hello-id')
290
inv.root.file_id = 'some-new-root'
292
self.assertEqual('some-tree-root', inv2.root.file_id)
293
self.assertEqual('hello', inv2['hello-id'].name)
295
def test_copy_empty(self):
296
"""Make sure an empty inventory can be copied."""
297
inv = inventory.Inventory(root_id=None)
299
self.assertIs(None, inv2.root)
301
def test_copy_copies_root_revision(self):
302
"""Make sure the revision of the root gets copied."""
303
inv = inventory.Inventory(root_id='someroot')
304
inv.root.revision = 'therev'
306
self.assertEquals('someroot', inv2.root.file_id)
307
self.assertEquals('therev', inv2.root.revision)
309
def test_create_tree_reference(self):
310
inv = inventory.Inventory('tree-root-123')
311
inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
312
revision='rev', reference_revision='rev2'))
314
def test_error_encoding(self):
315
inv = inventory.Inventory('tree-root')
316
inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
317
e = self.assertRaises(errors.InconsistentDelta, inv.add,
318
InventoryFile('b-id', u'\u1234', 'tree-root'))
319
self.assertContainsRe(str(e), r'\\u1234')
321
def test_add_recursive(self):
322
parent = InventoryDirectory('src-id', 'src', 'tree-root')
323
child = InventoryFile('hello-id', 'hello.c', 'src-id')
324
parent.children[child.file_id] = child
325
inv = inventory.Inventory('tree-root')
327
self.assertEqual('src/hello.c', inv.id2path('hello-id'))
331
class TestDeltaApplication(TestCaseWithTransport):
333
def get_empty_inventory(self, reference_inv=None):
334
"""Get an empty inventory.
336
Note that tests should not depend on the revision of the root for
337
setting up test conditions, as it has to be flexible to accomodate non
338
rich root repositories.
340
:param reference_inv: If not None, get the revision for the root from
341
this inventory. This is useful for dealing with older repositories
342
that routinely discarded the root entry data. If None, the root's
343
revision is set to 'basis'.
345
inv = inventory.Inventory()
346
if reference_inv is not None:
347
inv.root.revision = reference_inv.root.revision
349
inv.root.revision = 'basis'
352
def test_empty_delta(self):
353
inv = self.get_empty_inventory()
355
inv = self.apply_delta(self, inv, delta)
356
inv2 = self.get_empty_inventory(inv)
357
self.assertEqual([], inv2._make_delta(inv))
359
def test_None_file_id(self):
360
inv = self.get_empty_inventory()
361
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
362
dir1.revision = 'result'
363
delta = [(None, u'dir1', None, dir1)]
364
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
367
def test_unicode_file_id(self):
368
inv = self.get_empty_inventory()
369
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
370
dir1.revision = 'result'
371
delta = [(None, u'dir1', dir1.file_id, dir1)]
372
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
375
def test_repeated_file_id(self):
376
inv = self.get_empty_inventory()
377
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
378
file1.revision = 'result'
381
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
382
file2.revision = 'result'
385
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
386
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
389
def test_repeated_new_path(self):
390
inv = self.get_empty_inventory()
391
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
392
file1.revision = 'result'
395
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
396
file2.revision = 'result'
399
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
400
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
403
def test_repeated_old_path(self):
404
inv = self.get_empty_inventory()
405
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
406
file1.revision = 'result'
409
# We can't *create* a source inventory with the same path, but
410
# a badly generated partial delta might claim the same source twice.
411
# This would be buggy in two ways: the path is repeated in the delta,
412
# And the path for one of the file ids doesn't match the source
413
# location. Alternatively, we could have a repeated fileid, but that
414
# is separately checked for.
415
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
416
file2.revision = 'result'
421
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
422
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
425
def test_mismatched_id_entry_id(self):
426
inv = self.get_empty_inventory()
427
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
428
file1.revision = 'result'
431
delta = [(None, u'path', 'id', file1)]
432
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
435
def test_mismatched_new_path_entry_None(self):
436
inv = self.get_empty_inventory()
437
delta = [(None, u'path', 'id', None)]
438
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
441
def test_mismatched_new_path_None_entry(self):
442
inv = self.get_empty_inventory()
443
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
444
file1.revision = 'result'
447
delta = [(u"path", None, 'id1', file1)]
448
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
451
def test_parent_is_not_directory(self):
452
inv = self.get_empty_inventory()
453
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
454
file1.revision = 'result'
457
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
458
file2.revision = 'result'
462
delta = [(None, u'path/path2', 'id2', file2)]
463
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
466
def test_parent_is_missing(self):
467
inv = self.get_empty_inventory()
468
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
469
file2.revision = 'result'
472
delta = [(None, u'path/path2', 'id2', file2)]
473
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
476
def test_new_parent_path_has_wrong_id(self):
477
inv = self.get_empty_inventory()
478
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
479
parent1.revision = 'result'
480
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
481
parent2.revision = 'result'
482
file1 = inventory.InventoryFile('id', 'path', 'p-2')
483
file1.revision = 'result'
488
# This delta claims that file1 is at dir/path, but actually its at
489
# dir2/path if you follow the inventory parent structure.
490
delta = [(None, u'dir/path', 'id', file1)]
491
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
494
def test_old_parent_path_is_wrong(self):
495
inv = self.get_empty_inventory()
496
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
497
parent1.revision = 'result'
498
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
499
parent2.revision = 'result'
500
file1 = inventory.InventoryFile('id', 'path', 'p-2')
501
file1.revision = 'result'
507
# This delta claims that file1 was at dir/path, but actually it was at
508
# dir2/path if you follow the inventory parent structure.
509
delta = [(u'dir/path', None, 'id', None)]
510
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
513
def test_old_parent_path_is_for_other_id(self):
514
inv = self.get_empty_inventory()
515
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
516
parent1.revision = 'result'
517
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
518
parent2.revision = 'result'
519
file1 = inventory.InventoryFile('id', 'path', 'p-2')
520
file1.revision = 'result'
523
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
524
file2.revision = 'result'
531
# This delta claims that file1 was at dir/path, but actually it was at
532
# dir2/path if you follow the inventory parent structure. At dir/path
533
# is another entry we should not delete.
534
delta = [(u'dir/path', None, 'id', None)]
535
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
538
def test_add_existing_id_new_path(self):
539
inv = self.get_empty_inventory()
540
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
541
parent1.revision = 'result'
542
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
543
parent2.revision = 'result'
545
delta = [(None, u'dir2', 'p-1', parent2)]
546
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
549
def test_add_new_id_existing_path(self):
550
inv = self.get_empty_inventory()
551
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
552
parent1.revision = 'result'
553
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
554
parent2.revision = 'result'
556
delta = [(None, u'dir1', 'p-2', parent2)]
557
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
560
def test_remove_dir_leaving_dangling_child(self):
561
inv = self.get_empty_inventory()
562
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
563
dir1.revision = 'result'
564
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
565
dir2.revision = 'result'
566
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
567
dir3.revision = 'result'
571
delta = [(u'dir1', None, 'p-1', None),
572
(u'dir1/child2', None, 'p-3', None)]
573
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from cStringIO import StringIO
21
from bzrlib import errors, inventory, osutils
22
from bzrlib.branch import Branch
23
from bzrlib.diff import internal_diff
24
from bzrlib.inventory import (Inventory, ROOT_ID, InventoryFile,
25
InventoryDirectory, InventoryEntry)
26
from bzrlib.osutils import (has_symlinks, rename, pathjoin, is_inside_any,
27
is_inside_or_parent_of_any)
28
from bzrlib.tests import TestCase, TestCaseWithTransport
29
from bzrlib.transform import TreeTransform
30
from bzrlib.uncommit import uncommit
577
33
class TestInventory(TestCase):
579
def test_is_root(self):
580
"""Ensure our root-checking code is accurate."""
581
inv = inventory.Inventory('TREE_ROOT')
582
self.assertTrue(inv.is_root('TREE_ROOT'))
583
self.assertFalse(inv.is_root('booga'))
584
inv.root.file_id = 'booga'
585
self.assertFalse(inv.is_root('TREE_ROOT'))
586
self.assertTrue(inv.is_root('booga'))
587
# works properly even if no root is set
589
self.assertFalse(inv.is_root('TREE_ROOT'))
590
self.assertFalse(inv.is_root('booga'))
592
def test_entries_for_empty_inventory(self):
593
"""Test that entries() will not fail for an empty inventory"""
594
inv = Inventory(root_id=None)
595
self.assertEqual([], inv.entries())
35
def test_is_within(self):
37
SRC_FOO_C = pathjoin('src', 'foo.c')
38
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
42
self.assert_(is_inside_any(dirs, fn))
44
for dirs, fn in [(['src'], 'srccontrol'),
45
(['src'], 'srccontrol/foo')]:
46
self.assertFalse(is_inside_any(dirs, fn))
48
def test_is_within_or_parent(self):
49
for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
50
(['src'], 'src/foo.c'),
51
(['src/bar.c'], 'src'),
52
(['src/bar.c', 'bla/foo.c'], 'src'),
55
self.assert_(is_inside_or_parent_of_any(dirs, fn))
57
for dirs, fn in [(['src'], 'srccontrol'),
58
(['srccontrol/foo.c'], 'src'),
59
(['src'], 'srccontrol/foo')]:
60
self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
63
"""Test detection of files within selected directories."""
66
for args in [('src', 'directory', 'src-id'),
67
('doc', 'directory', 'doc-id'),
68
('src/hello.c', 'file'),
69
('src/bye.c', 'file', 'bye-id'),
70
('Makefile', 'file')]:
73
self.assertEqual(inv.path2id('src'), 'src-id')
74
self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
76
self.assert_('src-id' in inv)
78
def test_iter_entries(self):
81
for args in [('src', 'directory', 'src-id'),
82
('doc', 'directory', 'doc-id'),
83
('src/hello.c', 'file', 'hello-id'),
84
('src/bye.c', 'file', 'bye-id'),
85
('Makefile', 'file', 'makefile-id')]:
89
('Makefile', 'makefile-id'),
92
('src/bye.c', 'bye-id'),
93
('src/hello.c', 'hello-id'),
94
], [(path, ie.file_id) for path, ie in inv.iter_entries()])
96
def test_iter_entries_by_dir(self):
99
for args in [('src', 'directory', 'src-id'),
100
('doc', 'directory', 'doc-id'),
101
('src/hello.c', 'file', 'hello-id'),
102
('src/bye.c', 'file', 'bye-id'),
103
('zz', 'file', 'zz-id'),
104
('src/sub/', 'directory', 'sub-id'),
105
('src/zz.c', 'file', 'zzc-id'),
106
('src/sub/a', 'file', 'a-id'),
107
('Makefile', 'file', 'makefile-id')]:
111
('Makefile', 'makefile-id'),
115
('src/bye.c', 'bye-id'),
116
('src/hello.c', 'hello-id'),
117
('src/sub', 'sub-id'),
118
('src/zz.c', 'zzc-id'),
119
('src/sub/a', 'a-id'),
120
], [(path, ie.file_id) for path, ie in inv.iter_entries_by_dir()])
122
def test_version(self):
123
"""Inventory remembers the text's version."""
125
ie = inv.add_path('foo.txt', 'file')
598
129
class TestInventoryEntry(TestCase):
676
215
osutils.normalized_filename = orig_normalized_filename
218
class TestEntryDiffing(TestCaseWithTransport):
221
super(TestEntryDiffing, self).setUp()
222
self.wt = self.make_branch_and_tree('.')
223
self.branch = self.wt.branch
224
print >> open('file', 'wb'), 'foo'
225
print >> open('binfile', 'wb'), 'foo'
226
self.wt.add(['file'], ['fileid'])
227
self.wt.add(['binfile'], ['binfileid'])
229
os.symlink('target1', 'symlink')
230
self.wt.add(['symlink'], ['linkid'])
231
self.wt.commit('message_1', rev_id = '1')
232
print >> open('file', 'wb'), 'bar'
233
print >> open('binfile', 'wb'), 'x' * 1023 + '\x00'
236
os.symlink('target2', 'symlink')
237
self.tree_1 = self.branch.repository.revision_tree('1')
238
self.inv_1 = self.branch.repository.get_inventory('1')
239
self.file_1 = self.inv_1['fileid']
240
self.file_1b = self.inv_1['binfileid']
241
self.tree_2 = self.wt
242
self.inv_2 = self.tree_2.read_working_inventory()
243
self.file_2 = self.inv_2['fileid']
244
self.file_2b = self.inv_2['binfileid']
246
self.link_1 = self.inv_1['linkid']
247
self.link_2 = self.inv_2['linkid']
249
def test_file_diff_deleted(self):
251
self.file_1.diff(internal_diff,
252
"old_label", self.tree_1,
253
"/dev/null", None, None,
255
self.assertEqual(output.getvalue(), "--- old_label\n"
261
def test_file_diff_added(self):
263
self.file_1.diff(internal_diff,
264
"new_label", self.tree_1,
265
"/dev/null", None, None,
266
output, reverse=True)
267
self.assertEqual(output.getvalue(), "--- /dev/null\n"
273
def test_file_diff_changed(self):
275
self.file_1.diff(internal_diff,
276
"/dev/null", self.tree_1,
277
"new_label", self.file_2, self.tree_2,
279
self.assertEqual(output.getvalue(), "--- /dev/null\n"
286
def test_file_diff_binary(self):
288
self.file_1.diff(internal_diff,
289
"/dev/null", self.tree_1,
290
"new_label", self.file_2b, self.tree_2,
292
self.assertEqual(output.getvalue(),
293
"Binary files /dev/null and new_label differ\n")
294
def test_link_diff_deleted(self):
295
if not has_symlinks():
298
self.link_1.diff(internal_diff,
299
"old_label", self.tree_1,
300
"/dev/null", None, None,
302
self.assertEqual(output.getvalue(),
303
"=== target was 'target1'\n")
305
def test_link_diff_added(self):
306
if not has_symlinks():
309
self.link_1.diff(internal_diff,
310
"new_label", self.tree_1,
311
"/dev/null", None, None,
312
output, reverse=True)
313
self.assertEqual(output.getvalue(),
314
"=== target is 'target1'\n")
316
def test_link_diff_changed(self):
317
if not has_symlinks():
320
self.link_1.diff(internal_diff,
321
"/dev/null", self.tree_1,
322
"new_label", self.link_2, self.tree_2,
324
self.assertEqual(output.getvalue(),
325
"=== target changed 'target1' => 'target2'\n")
328
class TestSnapshot(TestCaseWithTransport):
331
# for full testing we'll need a branch
332
# with a subdir to test parent changes.
333
# and a file, link and dir under that.
334
# but right now I only need one attribute
335
# to change, and then test merge patterns
336
# with fake parent entries.
337
super(TestSnapshot, self).setUp()
338
self.wt = self.make_branch_and_tree('.')
339
self.branch = self.wt.branch
340
self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
341
self.wt.add(['subdir', 'subdir/file'],
345
self.wt.commit('message_1', rev_id = '1')
346
self.tree_1 = self.branch.repository.revision_tree('1')
347
self.inv_1 = self.branch.repository.get_inventory('1')
348
self.file_1 = self.inv_1['fileid']
349
self.file_active = self.wt.inventory['fileid']
350
self.builder = self.branch.get_commit_builder([], timestamp=time.time(), revision_id='2')
352
def test_snapshot_new_revision(self):
353
# This tests that a simple commit with no parents makes a new
354
# revision value in the inventory entry
355
self.file_active.snapshot('2', 'subdir/file', {}, self.wt, self.builder)
356
# expected outcome - file_1 has a revision id of '2', and we can get
357
# its text of 'file contents' out of the weave.
358
self.assertEqual(self.file_1.revision, '1')
359
self.assertEqual(self.file_active.revision, '2')
360
# this should be a separate test probably, but lets check it once..
361
lines = self.branch.repository.weave_store.get_weave(
363
self.branch.get_transaction()).get_lines('2')
364
self.assertEqual(lines, ['contents of subdir/file\n'])
366
def test_snapshot_unchanged(self):
367
#This tests that a simple commit does not make a new entry for
368
# an unchanged inventory entry
369
self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
370
self.wt, self.builder)
371
self.assertEqual(self.file_1.revision, '1')
372
self.assertEqual(self.file_active.revision, '1')
373
vf = self.branch.repository.weave_store.get_weave(
375
self.branch.repository.get_transaction())
376
self.assertRaises(errors.RevisionNotPresent,
380
def test_snapshot_merge_identical_different_revid(self):
381
# This tests that a commit with two identical parents, one of which has
382
# a different revision id, results in a new revision id in the entry.
383
# 1->other, commit a merge of other against 1, results in 2.
384
other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
385
other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
386
other_ie.revision = '1'
387
other_ie.text_sha1 = self.file_1.text_sha1
388
other_ie.text_size = self.file_1.text_size
389
self.assertEqual(self.file_1, other_ie)
390
other_ie.revision = 'other'
391
self.assertNotEqual(self.file_1, other_ie)
392
versionfile = self.branch.repository.weave_store.get_weave(
393
'fileid', self.branch.repository.get_transaction())
394
versionfile.clone_text('other', '1', ['1'])
395
self.file_active.snapshot('2', 'subdir/file',
396
{'1':self.file_1, 'other':other_ie},
397
self.wt, self.builder)
398
self.assertEqual(self.file_active.revision, '2')
400
def test_snapshot_changed(self):
401
# This tests that a commit with one different parent results in a new
402
# revision id in the entry.
403
self.file_active.name='newname'
404
rename('subdir/file', 'subdir/newname')
405
self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1},
406
self.wt, self.builder)
407
# expected outcome - file_1 has a revision id of '2'
408
self.assertEqual(self.file_active.revision, '2')
411
class TestPreviousHeads(TestCaseWithTransport):
414
# we want several inventories, that respectively
415
# give use the following scenarios:
416
# A) fileid not in any inventory (A),
417
# B) fileid present in one inventory (B) and (A,B)
418
# C) fileid present in two inventories, and they
419
# are not mutual descendents (B, C)
420
# D) fileid present in two inventories and one is
421
# a descendent of the other. (B, D)
422
super(TestPreviousHeads, self).setUp()
423
self.wt = self.make_branch_and_tree('.')
424
self.branch = self.wt.branch
425
self.build_tree(['file'])
426
self.wt.commit('new branch', allow_pointless=True, rev_id='A')
427
self.inv_A = self.branch.repository.get_inventory('A')
428
self.wt.add(['file'], ['fileid'])
429
self.wt.commit('add file', rev_id='B')
430
self.inv_B = self.branch.repository.get_inventory('B')
431
uncommit(self.branch, tree=self.wt)
432
self.assertEqual(self.branch.revision_history(), ['A'])
433
self.wt.commit('another add of file', rev_id='C')
434
self.inv_C = self.branch.repository.get_inventory('C')
435
self.wt.add_pending_merge('B')
436
self.wt.commit('merge in B', rev_id='D')
437
self.inv_D = self.branch.repository.get_inventory('D')
438
self.file_active = self.wt.inventory['fileid']
439
self.weave = self.branch.repository.weave_store.get_weave('fileid',
440
self.branch.repository.get_transaction())
442
def get_previous_heads(self, inventories):
443
return self.file_active.find_previous_heads(
445
self.branch.repository.weave_store,
446
self.branch.repository.get_transaction())
448
def test_fileid_in_no_inventory(self):
449
self.assertEqual({}, self.get_previous_heads([self.inv_A]))
451
def test_fileid_in_one_inventory(self):
452
self.assertEqual({'B':self.inv_B['fileid']},
453
self.get_previous_heads([self.inv_B]))
454
self.assertEqual({'B':self.inv_B['fileid']},
455
self.get_previous_heads([self.inv_A, self.inv_B]))
456
self.assertEqual({'B':self.inv_B['fileid']},
457
self.get_previous_heads([self.inv_B, self.inv_A]))
459
def test_fileid_in_two_inventories_gives_both_entries(self):
460
self.assertEqual({'B':self.inv_B['fileid'],
461
'C':self.inv_C['fileid']},
462
self.get_previous_heads([self.inv_B, self.inv_C]))
463
self.assertEqual({'B':self.inv_B['fileid'],
464
'C':self.inv_C['fileid']},
465
self.get_previous_heads([self.inv_C, self.inv_B]))
467
def test_fileid_in_two_inventories_already_merged_gives_head(self):
468
self.assertEqual({'D':self.inv_D['fileid']},
469
self.get_previous_heads([self.inv_B, self.inv_D]))
470
self.assertEqual({'D':self.inv_D['fileid']},
471
self.get_previous_heads([self.inv_D, self.inv_B]))
473
# TODO: test two inventories with the same file revision
679
476
class TestDescribeChanges(TestCase):
681
478
def test_describe_change(self):
734
531
self.assertEqual(expected_change, change)
737
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
739
def get_chk_bytes(self):
740
factory = groupcompress.make_pack_factory(True, True, 1)
741
trans = self.get_transport('')
742
return factory(trans)
744
def read_bytes(self, chk_bytes, key):
745
stream = chk_bytes.get_record_stream([key], 'unordered', True)
746
return stream.next().get_bytes_as("fulltext")
748
def test_deserialise_gives_CHKInventory(self):
750
inv.revision_id = "revid"
751
inv.root.revision = "rootrev"
752
chk_bytes = self.get_chk_bytes()
753
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
754
bytes = ''.join(chk_inv.to_lines())
755
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
756
self.assertEqual("revid", new_inv.revision_id)
757
self.assertEqual("directory", new_inv.root.kind)
758
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
759
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
760
self.assertEqual(inv.root.name, new_inv.root.name)
761
self.assertEqual("rootrev", new_inv.root.revision)
762
self.assertEqual('plain', new_inv._search_key_name)
764
def test_deserialise_wrong_revid(self):
766
inv.revision_id = "revid"
767
inv.root.revision = "rootrev"
768
chk_bytes = self.get_chk_bytes()
769
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
770
bytes = ''.join(chk_inv.to_lines())
771
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
774
def test_captures_rev_root_byid(self):
776
inv.revision_id = "foo"
777
inv.root.revision = "bar"
778
chk_bytes = self.get_chk_bytes()
779
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
780
lines = chk_inv.to_lines()
783
'revision_id: foo\n',
784
'root_id: TREE_ROOT\n',
785
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
786
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
788
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
789
self.assertEqual('plain', chk_inv._search_key_name)
791
def test_captures_parent_id_basename_index(self):
793
inv.revision_id = "foo"
794
inv.root.revision = "bar"
795
chk_bytes = self.get_chk_bytes()
796
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
797
lines = chk_inv.to_lines()
800
'revision_id: foo\n',
801
'root_id: TREE_ROOT\n',
802
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
803
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
805
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
806
self.assertEqual('plain', chk_inv._search_key_name)
808
def test_captures_search_key_name(self):
810
inv.revision_id = "foo"
811
inv.root.revision = "bar"
812
chk_bytes = self.get_chk_bytes()
813
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
814
search_key_name='hash-16-way')
815
lines = chk_inv.to_lines()
818
'search_key_name: hash-16-way\n',
819
'root_id: TREE_ROOT\n',
820
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
821
'revision_id: foo\n',
822
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
824
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
825
self.assertEqual('hash-16-way', chk_inv._search_key_name)
827
def test_directory_children_on_demand(self):
829
inv.revision_id = "revid"
830
inv.root.revision = "rootrev"
831
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
832
inv["fileid"].revision = "filerev"
833
inv["fileid"].executable = True
834
inv["fileid"].text_sha1 = "ffff"
835
inv["fileid"].text_size = 1
836
chk_bytes = self.get_chk_bytes()
837
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
838
bytes = ''.join(chk_inv.to_lines())
839
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
840
root_entry = new_inv[inv.root.file_id]
841
self.assertEqual(None, root_entry._children)
842
self.assertEqual(['file'], root_entry.children.keys())
843
file_direct = new_inv["fileid"]
844
file_found = root_entry.children['file']
845
self.assertEqual(file_direct.kind, file_found.kind)
846
self.assertEqual(file_direct.file_id, file_found.file_id)
847
self.assertEqual(file_direct.parent_id, file_found.parent_id)
848
self.assertEqual(file_direct.name, file_found.name)
849
self.assertEqual(file_direct.revision, file_found.revision)
850
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
851
self.assertEqual(file_direct.text_size, file_found.text_size)
852
self.assertEqual(file_direct.executable, file_found.executable)
854
def test_from_inventory_maximum_size(self):
855
# from_inventory supports the maximum_size parameter.
857
inv.revision_id = "revid"
858
inv.root.revision = "rootrev"
859
chk_bytes = self.get_chk_bytes()
860
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
861
chk_inv.id_to_entry._ensure_root()
862
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
863
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
864
p_id_basename = chk_inv.parent_id_basename_to_file_id
865
p_id_basename._ensure_root()
866
self.assertEqual(120, p_id_basename._root_node.maximum_size)
867
self.assertEqual(2, p_id_basename._root_node._key_width)
869
def test___iter__(self):
871
inv.revision_id = "revid"
872
inv.root.revision = "rootrev"
873
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
874
inv["fileid"].revision = "filerev"
875
inv["fileid"].executable = True
876
inv["fileid"].text_sha1 = "ffff"
877
inv["fileid"].text_size = 1
878
chk_bytes = self.get_chk_bytes()
879
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
880
bytes = ''.join(chk_inv.to_lines())
881
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
882
fileids = list(new_inv.__iter__())
884
self.assertEqual([inv.root.file_id, "fileid"], fileids)
886
def test__len__(self):
888
inv.revision_id = "revid"
889
inv.root.revision = "rootrev"
890
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
891
inv["fileid"].revision = "filerev"
892
inv["fileid"].executable = True
893
inv["fileid"].text_sha1 = "ffff"
894
inv["fileid"].text_size = 1
895
chk_bytes = self.get_chk_bytes()
896
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
897
self.assertEqual(2, len(chk_inv))
899
def test___getitem__(self):
901
inv.revision_id = "revid"
902
inv.root.revision = "rootrev"
903
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
904
inv["fileid"].revision = "filerev"
905
inv["fileid"].executable = True
906
inv["fileid"].text_sha1 = "ffff"
907
inv["fileid"].text_size = 1
908
chk_bytes = self.get_chk_bytes()
909
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
910
bytes = ''.join(chk_inv.to_lines())
911
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
912
root_entry = new_inv[inv.root.file_id]
913
file_entry = new_inv["fileid"]
914
self.assertEqual("directory", root_entry.kind)
915
self.assertEqual(inv.root.file_id, root_entry.file_id)
916
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
917
self.assertEqual(inv.root.name, root_entry.name)
918
self.assertEqual("rootrev", root_entry.revision)
919
self.assertEqual("file", file_entry.kind)
920
self.assertEqual("fileid", file_entry.file_id)
921
self.assertEqual(inv.root.file_id, file_entry.parent_id)
922
self.assertEqual("file", file_entry.name)
923
self.assertEqual("filerev", file_entry.revision)
924
self.assertEqual("ffff", file_entry.text_sha1)
925
self.assertEqual(1, file_entry.text_size)
926
self.assertEqual(True, file_entry.executable)
927
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
929
def test_has_id_true(self):
931
inv.revision_id = "revid"
932
inv.root.revision = "rootrev"
933
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
934
inv["fileid"].revision = "filerev"
935
inv["fileid"].executable = True
936
inv["fileid"].text_sha1 = "ffff"
937
inv["fileid"].text_size = 1
938
chk_bytes = self.get_chk_bytes()
939
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
940
self.assertTrue(chk_inv.has_id('fileid'))
941
self.assertTrue(chk_inv.has_id(inv.root.file_id))
943
def test_has_id_not(self):
945
inv.revision_id = "revid"
946
inv.root.revision = "rootrev"
947
chk_bytes = self.get_chk_bytes()
948
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
949
self.assertFalse(chk_inv.has_id('fileid'))
951
def test_id2path(self):
953
inv.revision_id = "revid"
954
inv.root.revision = "rootrev"
955
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
956
fileentry = InventoryFile("fileid", "file", "dirid")
959
inv["fileid"].revision = "filerev"
960
inv["fileid"].executable = True
961
inv["fileid"].text_sha1 = "ffff"
962
inv["fileid"].text_size = 1
963
inv["dirid"].revision = "filerev"
964
chk_bytes = self.get_chk_bytes()
965
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
966
bytes = ''.join(chk_inv.to_lines())
967
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
968
self.assertEqual('', new_inv.id2path(inv.root.file_id))
969
self.assertEqual('dir', new_inv.id2path('dirid'))
970
self.assertEqual('dir/file', new_inv.id2path('fileid'))
972
def test_path2id(self):
974
inv.revision_id = "revid"
975
inv.root.revision = "rootrev"
976
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
977
fileentry = InventoryFile("fileid", "file", "dirid")
980
inv["fileid"].revision = "filerev"
981
inv["fileid"].executable = True
982
inv["fileid"].text_sha1 = "ffff"
983
inv["fileid"].text_size = 1
984
inv["dirid"].revision = "filerev"
985
chk_bytes = self.get_chk_bytes()
986
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
987
bytes = ''.join(chk_inv.to_lines())
988
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
989
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
990
self.assertEqual('dirid', new_inv.path2id('dir'))
991
self.assertEqual('fileid', new_inv.path2id('dir/file'))
993
def test_create_by_apply_delta_sets_root(self):
995
inv.revision_id = "revid"
996
chk_bytes = self.get_chk_bytes()
997
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
998
inv.add_path("", "directory", "myrootid", None)
999
inv.revision_id = "expectedid"
1000
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1001
delta = [("", None, base_inv.root.file_id, None),
1002
(None, "", "myrootid", inv.root)]
1003
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1004
self.assertEquals(reference_inv.root, new_inv.root)
1006
def test_create_by_apply_delta_empty_add_child(self):
1008
inv.revision_id = "revid"
1009
inv.root.revision = "rootrev"
1010
chk_bytes = self.get_chk_bytes()
1011
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1012
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1013
a_entry.revision = "filerev"
1014
a_entry.executable = True
1015
a_entry.text_sha1 = "ffff"
1016
a_entry.text_size = 1
1018
inv.revision_id = "expectedid"
1019
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1020
delta = [(None, "A", "A-id", a_entry)]
1021
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1022
# new_inv should be the same as reference_inv.
1023
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1024
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1025
reference_inv.id_to_entry._ensure_root()
1026
new_inv.id_to_entry._ensure_root()
1027
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1028
new_inv.id_to_entry._root_node._key)
1030
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1032
inv.revision_id = "revid"
1033
inv.root.revision = "rootrev"
1034
chk_bytes = self.get_chk_bytes()
1035
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1036
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1037
a_entry.revision = "filerev"
1038
a_entry.executable = True
1039
a_entry.text_sha1 = "ffff"
1040
a_entry.text_size = 1
1042
inv.revision_id = "expectedid"
1043
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1044
delta = [(None, "A", "A-id", a_entry)]
1045
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1046
reference_inv.id_to_entry._ensure_root()
1047
reference_inv.parent_id_basename_to_file_id._ensure_root()
1048
new_inv.id_to_entry._ensure_root()
1049
new_inv.parent_id_basename_to_file_id._ensure_root()
1050
# new_inv should be the same as reference_inv.
1051
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1052
self.assertEqual(reference_inv.root_id, new_inv.root_id)
1053
self.assertEqual(reference_inv.id_to_entry._root_node._key,
1054
new_inv.id_to_entry._root_node._key)
1055
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1056
new_inv.parent_id_basename_to_file_id._root_node._key)
1058
def test_iter_changes(self):
1059
# Low level bootstrapping smoke test; comprehensive generic tests via
1060
# InterTree are coming.
1062
inv.revision_id = "revid"
1063
inv.root.revision = "rootrev"
1064
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1065
inv["fileid"].revision = "filerev"
1066
inv["fileid"].executable = True
1067
inv["fileid"].text_sha1 = "ffff"
1068
inv["fileid"].text_size = 1
1070
inv2.revision_id = "revid2"
1071
inv2.root.revision = "rootrev"
1072
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1073
inv2["fileid"].revision = "filerev2"
1074
inv2["fileid"].executable = False
1075
inv2["fileid"].text_sha1 = "bbbb"
1076
inv2["fileid"].text_size = 2
1077
# get fresh objects.
1078
chk_bytes = self.get_chk_bytes()
1079
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1080
bytes = ''.join(chk_inv.to_lines())
1081
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1082
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1083
bytes = ''.join(chk_inv2.to_lines())
1084
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1085
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1086
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1088
list(inv_1.iter_changes(inv_2)))
1090
def test_parent_id_basename_to_file_id_index_enabled(self):
1092
inv.revision_id = "revid"
1093
inv.root.revision = "rootrev"
1094
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1095
inv["fileid"].revision = "filerev"
1096
inv["fileid"].executable = True
1097
inv["fileid"].text_sha1 = "ffff"
1098
inv["fileid"].text_size = 1
1099
# get fresh objects.
1100
chk_bytes = self.get_chk_bytes()
1101
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1102
bytes = ''.join(tmp_inv.to_lines())
1103
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1104
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1106
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1107
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1109
def test_file_entry_to_bytes(self):
1110
inv = CHKInventory(None)
1111
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1112
ie.executable = True
1113
ie.revision = 'file-rev-id'
1114
ie.text_sha1 = 'abcdefgh'
1116
bytes = inv._entry_to_bytes(ie)
1117
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1118
'file-rev-id\nabcdefgh\n100\nY', bytes)
1119
ie2 = inv._bytes_to_entry(bytes)
1120
self.assertEqual(ie, ie2)
1121
self.assertIsInstance(ie2.name, unicode)
1122
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1123
inv._bytes_to_utf8name_key(bytes))
1125
def test_file2_entry_to_bytes(self):
1126
inv = CHKInventory(None)
1128
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1129
ie.executable = False
1130
ie.revision = 'file-rev-id'
1131
ie.text_sha1 = '123456'
1133
bytes = inv._entry_to_bytes(ie)
1134
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1135
'file-rev-id\n123456\n25\nN', bytes)
1136
ie2 = inv._bytes_to_entry(bytes)
1137
self.assertEqual(ie, ie2)
1138
self.assertIsInstance(ie2.name, unicode)
1139
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1140
inv._bytes_to_utf8name_key(bytes))
1142
def test_dir_entry_to_bytes(self):
1143
inv = CHKInventory(None)
1144
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1145
ie.revision = 'dir-rev-id'
1146
bytes = inv._entry_to_bytes(ie)
1147
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1148
ie2 = inv._bytes_to_entry(bytes)
1149
self.assertEqual(ie, ie2)
1150
self.assertIsInstance(ie2.name, unicode)
1151
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1152
inv._bytes_to_utf8name_key(bytes))
1154
def test_dir2_entry_to_bytes(self):
1155
inv = CHKInventory(None)
1156
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1158
ie.revision = 'dir-rev-id'
1159
bytes = inv._entry_to_bytes(ie)
1160
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1161
'dir-rev-id', bytes)
1162
ie2 = inv._bytes_to_entry(bytes)
1163
self.assertEqual(ie, ie2)
1164
self.assertIsInstance(ie2.name, unicode)
1165
self.assertIs(ie2.parent_id, None)
1166
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1167
inv._bytes_to_utf8name_key(bytes))
1169
def test_symlink_entry_to_bytes(self):
1170
inv = CHKInventory(None)
1171
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1172
ie.revision = 'link-rev-id'
1173
ie.symlink_target = u'target/path'
1174
bytes = inv._entry_to_bytes(ie)
1175
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1176
'link-rev-id\ntarget/path', bytes)
1177
ie2 = inv._bytes_to_entry(bytes)
1178
self.assertEqual(ie, ie2)
1179
self.assertIsInstance(ie2.name, unicode)
1180
self.assertIsInstance(ie2.symlink_target, unicode)
1181
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1182
inv._bytes_to_utf8name_key(bytes))
1184
def test_symlink2_entry_to_bytes(self):
1185
inv = CHKInventory(None)
1186
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1187
ie.revision = 'link-rev-id'
1188
ie.symlink_target = u'target/\u03a9path'
1189
bytes = inv._entry_to_bytes(ie)
1190
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1191
'link-rev-id\ntarget/\xce\xa9path', bytes)
1192
ie2 = inv._bytes_to_entry(bytes)
1193
self.assertEqual(ie, ie2)
1194
self.assertIsInstance(ie2.name, unicode)
1195
self.assertIsInstance(ie2.symlink_target, unicode)
1196
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1197
inv._bytes_to_utf8name_key(bytes))
1199
def test_tree_reference_entry_to_bytes(self):
1200
inv = CHKInventory(None)
1201
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1203
ie.revision = 'tree-rev-id'
1204
ie.reference_revision = 'ref-rev-id'
1205
bytes = inv._entry_to_bytes(ie)
1206
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1207
'tree-rev-id\nref-rev-id', bytes)
1208
ie2 = inv._bytes_to_entry(bytes)
1209
self.assertEqual(ie, ie2)
1210
self.assertIsInstance(ie2.name, unicode)
1211
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1212
inv._bytes_to_utf8name_key(bytes))
1215
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1217
def get_chk_bytes(self):
1218
factory = groupcompress.make_pack_factory(True, True, 1)
1219
trans = self.get_transport('')
1220
return factory(trans)
1222
def make_dir(self, inv, name, parent_id):
1223
inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1225
def make_file(self, inv, name, parent_id, content='content\n'):
1226
ie = inv.make_entry('file', name, parent_id, name + '-id')
1227
ie.text_sha1 = osutils.sha_string(content)
1228
ie.text_size = len(content)
1231
def make_simple_inventory(self):
1232
inv = Inventory('TREE_ROOT')
1233
inv.revision_id = "revid"
1234
inv.root.revision = "rootrev"
1237
# sub-file1 sub-file1-id
1238
# sub-file2 sub-file2-id
1239
# sub-dir1/ sub-dir1-id
1240
# subsub-file1 subsub-file1-id
1242
# sub2-file1 sub2-file1-id
1244
self.make_dir(inv, 'dir1', 'TREE_ROOT')
1245
self.make_dir(inv, 'dir2', 'TREE_ROOT')
1246
self.make_dir(inv, 'sub-dir1', 'dir1-id')
1247
self.make_file(inv, 'top', 'TREE_ROOT')
1248
self.make_file(inv, 'sub-file1', 'dir1-id')
1249
self.make_file(inv, 'sub-file2', 'dir1-id')
1250
self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1251
self.make_file(inv, 'sub2-file1', 'dir2-id')
1252
chk_bytes = self.get_chk_bytes()
1253
# use a small maximum_size to force internal paging structures
1254
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1256
search_key_name='hash-255-way')
1257
bytes = ''.join(chk_inv.to_lines())
1258
return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1260
def assert_Getitems(self, expected_fileids, inv, file_ids):
1261
self.assertEqual(sorted(expected_fileids),
1262
sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1264
def assertExpand(self, all_ids, inv, file_ids):
1266
val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1267
self.assertEqual(set(all_ids), val_all_ids)
1268
entries = inv._getitems(val_all_ids)
1269
expected_children = {}
1270
for entry in entries:
1271
s = expected_children.setdefault(entry.parent_id, [])
1272
s.append(entry.file_id)
1273
val_children = dict((k, sorted(v)) for k, v
1274
in val_children.iteritems())
1275
expected_children = dict((k, sorted(v)) for k, v
1276
in expected_children.iteritems())
1277
self.assertEqual(expected_children, val_children)
1279
def test_make_simple_inventory(self):
1280
inv = self.make_simple_inventory()
1282
for path, entry in inv.iter_entries_by_dir():
1283
layout.append((path, entry.file_id))
1286
('dir1', 'dir1-id'),
1287
('dir2', 'dir2-id'),
1289
('dir1/sub-dir1', 'sub-dir1-id'),
1290
('dir1/sub-file1', 'sub-file1-id'),
1291
('dir1/sub-file2', 'sub-file2-id'),
1292
('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1293
('dir2/sub2-file1', 'sub2-file1-id'),
1296
def test__getitems(self):
1297
inv = self.make_simple_inventory()
1299
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1300
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1301
self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1303
self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1305
self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1306
['dir1-id', 'sub-file2-id'])
1307
self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1308
self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1310
def test_single_file(self):
1311
inv = self.make_simple_inventory()
1312
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1314
def test_get_all_parents(self):
1315
inv = self.make_simple_inventory()
1316
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1318
], inv, ['subsub-file1-id'])
1320
def test_get_children(self):
1321
inv = self.make_simple_inventory()
1322
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1323
'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1324
], inv, ['dir1-id'])
1326
def test_from_root(self):
1327
inv = self.make_simple_inventory()
1328
self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1329
'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1330
'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1332
def test_top_level_file(self):
1333
inv = self.make_simple_inventory()
1334
self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1336
def test_subsub_file(self):
1337
inv = self.make_simple_inventory()
1338
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1339
'subsub-file1-id'], inv, ['subsub-file1-id'])
1341
def test_sub_and_root(self):
1342
inv = self.make_simple_inventory()
1343
self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1344
'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
534
class TestRevert(TestCaseWithTransport):
536
def test_dangling_id(self):
537
wt = self.make_branch_and_tree('b1')
538
self.assertEqual(len(wt.inventory), 1)
539
open('b1/a', 'wb').write('a test\n')
541
self.assertEqual(len(wt.inventory), 2)
544
self.assertEqual(len(wt.inventory), 1)