1
# Copyright (C) 2005 by Canonical Ltd
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from cStringIO import StringIO
20
from bzrlib.branch import Branch
21
import bzrlib.errors as errors
22
from bzrlib.diff import internal_diff
23
from bzrlib.inventory import Inventory, ROOT_ID
24
import bzrlib.inventory as inventory
25
from bzrlib.osutils import has_symlinks, rename, pathjoin
26
from bzrlib.tests import TestCase, TestCaseWithTransport
29
class TestInventory(TestCase):
31
def test_is_within(self):
32
from bzrlib.osutils import is_inside_any
34
SRC_FOO_C = pathjoin('src', 'foo.c')
35
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
39
self.assert_(is_inside_any(dirs, fn))
41
for dirs, fn in [(['src'], 'srccontrol'),
42
(['src'], 'srccontrol/foo')]:
43
self.assertFalse(is_inside_any(dirs, fn))
46
"""Test detection of files within selected directories."""
49
for args in [('src', 'directory', 'src-id'),
50
('doc', 'directory', 'doc-id'),
51
('src/hello.c', 'file'),
52
('src/bye.c', 'file', 'bye-id'),
53
('Makefile', 'file')]:
56
self.assertEqual(inv.path2id('src'), 'src-id')
57
self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
59
self.assert_('src-id' in inv)
62
def test_version(self):
63
"""Inventory remembers the text's version."""
65
ie = inv.add_path('foo.txt', 'file')
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
27
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
28
InventoryDirectory, InventoryEntry, TreeReference)
29
from bzrlib.tests import (
31
TestCaseWithTransport,
34
split_suite_by_condition,
36
from bzrlib.tests.per_workingtree import workingtree_formats
39
def load_tests(standard_tests, module, loader):
40
"""Parameterise some inventory tests."""
41
to_adapt, result = split_suite_by_condition(standard_tests,
42
condition_isinstance(TestDeltaApplication))
44
('Inventory', {'apply_delta':apply_inventory_Inventory}),
46
# Working tree basis delta application
47
# Repository add_inv_by_delta.
48
# Reduce form of the per_repository test logic - that logic needs to be
49
# be able to get /just/ repositories whereas these tests are fine with
50
# just creating trees.
52
for _, format in repository.format_registry.iteritems():
53
scenarios.append((str(format.__name__), {
54
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
56
for format in workingtree_formats():
58
(str(format.__class__.__name__) + ".update_basis_by_delta", {
59
'apply_delta':apply_inventory_WT_basis,
62
(str(format.__class__.__name__) + ".apply_inventory_delta", {
63
'apply_delta':apply_inventory_WT,
65
return multiply_tests(to_adapt, scenarios, result)
68
def apply_inventory_Inventory(self, basis, delta):
69
"""Apply delta to basis and return the result.
71
:param basis: An inventory to be used as the basis.
72
:param delta: The inventory delta to apply:
73
:return: An inventory resulting from the application.
75
basis.apply_delta(delta)
79
def apply_inventory_WT(self, basis, delta):
80
"""Apply delta to basis and return the result.
82
This sets the tree state to be basis, and then calls apply_inventory_delta.
84
:param basis: An inventory to be used as the basis.
85
:param delta: The inventory delta to apply:
86
:return: An inventory resulting from the application.
88
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
89
control.create_repository()
90
control.create_branch()
91
tree = self.format.initialize(control)
94
tree._write_inventory(basis)
97
# Fresh object, reads disk again.
98
tree = tree.bzrdir.open_workingtree()
101
tree.apply_inventory_delta(delta)
104
# reload tree - ensure we get what was written.
105
tree = tree.bzrdir.open_workingtree()
107
self.addCleanup(tree.unlock)
108
# One could add 'tree._validate' here but that would cause 'early' failues
109
# as far as higher level code is concerned. Possibly adding an
110
# expect_fail parameter to this function and if that is False then do a
112
return tree.inventory
115
def apply_inventory_WT_basis(self, basis, delta):
116
"""Apply delta to basis and return the result.
118
This sets the parent and then calls update_basis_by_delta.
119
It also puts the basis in the repository under both 'basis' and 'result' to
120
allow safety checks made by the WT to succeed, and finally ensures that all
121
items in the delta with a new path are present in the WT before calling
122
update_basis_by_delta.
124
:param basis: An inventory to be used as the basis.
125
:param delta: The inventory delta to apply:
126
:return: An inventory resulting from the application.
128
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
129
control.create_repository()
130
control.create_branch()
131
tree = self.format.initialize(control)
134
repo = tree.branch.repository
135
repo.start_write_group()
137
rev = revision.Revision('basis', timestamp=0, timezone=None,
138
message="", committer="foo@example.com")
139
basis.revision_id = 'basis'
140
repo.add_revision('basis', rev, basis)
141
# Add a revision for the result, with the basis content -
142
# update_basis_by_delta doesn't check that the delta results in
143
# result, and we want inconsistent deltas to get called on the
144
# tree, or else the code isn't actually checked.
145
rev = revision.Revision('result', timestamp=0, timezone=None,
146
message="", committer="foo@example.com")
147
basis.revision_id = 'result'
148
repo.add_revision('result', rev, basis)
150
repo.abort_write_group()
153
repo.commit_write_group()
154
# Set the basis state as the trees current state
155
tree._write_inventory(basis)
156
# This reads basis from the repo and puts it into the tree's local
157
# cache, if it has one.
158
tree.set_parent_ids(['basis'])
161
for old, new, id, entry in delta:
162
if None in (new, entry):
164
paths[new] = (entry.file_id, entry.kind)
165
parents.add(osutils.dirname(new))
166
parents = osutils.minimum_path_selection(parents)
168
# Put place holders in the tree to permit adding the other entries.
169
for pos, parent in enumerate(parents):
170
if not tree.path2id(parent):
171
# add a synthetic directory in the tree so we can can put the
172
# tree0 entries in place for dirstate.
173
tree.add([parent], ["id%d" % pos], ["directory"])
175
# Many deltas may cause this mini-apply to fail, but we want to see what
176
# the delta application code says, not the prep that we do to deal with
177
# limitations of dirstate's update_basis code.
178
for path, (file_id, kind) in sorted(paths.items()):
180
tree.add([path], [file_id], [kind])
181
except (KeyboardInterrupt, SystemExit):
187
# Fresh lock, reads disk again.
190
tree.update_basis_by_delta('result', delta)
193
# reload tree - ensure we get what was written.
194
tree = tree.bzrdir.open_workingtree()
195
basis_tree = tree.basis_tree()
196
basis_tree.lock_read()
197
self.addCleanup(basis_tree.unlock)
198
# Note, that if the tree does not have a local cache, the trick above of
199
# setting the result as the basis, will come back to bite us. That said,
200
# all the implementations in bzr do have a local cache.
201
return basis_tree.inventory
204
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
205
"""Apply delta to basis and return the result.
207
This inserts basis as a whole inventory and then uses
208
add_inventory_by_delta to add delta.
210
:param basis: An inventory to be used as the basis.
211
:param delta: The inventory delta to apply:
212
:return: An inventory resulting from the application.
214
format = self.format()
215
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
216
repo = format.initialize(control)
219
repo.start_write_group()
221
rev = revision.Revision('basis', timestamp=0, timezone=None,
222
message="", committer="foo@example.com")
223
basis.revision_id = 'basis'
224
repo.add_revision('basis', rev, basis)
226
repo.abort_write_group()
229
repo.commit_write_group()
234
repo.start_write_group()
236
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
239
repo.abort_write_group()
242
repo.commit_write_group()
245
# Fresh lock, reads disk again.
246
repo = repo.bzrdir.open_repository()
248
self.addCleanup(repo.unlock)
249
return repo.get_inventory('result')
252
class TestDeltaApplication(TestCaseWithTransport):
254
def get_empty_inventory(self, reference_inv=None):
255
"""Get an empty inventory.
257
Note that tests should not depend on the revision of the root for
258
setting up test conditions, as it has to be flexible to accomodate non
259
rich root repositories.
261
:param reference_inv: If not None, get the revision for the root from
262
this inventory. This is useful for dealing with older repositories
263
that routinely discarded the root entry data. If None, the root's
264
revision is set to 'basis'.
266
inv = inventory.Inventory()
267
if reference_inv is not None:
268
inv.root.revision = reference_inv.root.revision
270
inv.root.revision = 'basis'
273
def test_empty_delta(self):
274
inv = self.get_empty_inventory()
276
inv = self.apply_delta(self, inv, delta)
277
inv2 = self.get_empty_inventory(inv)
278
self.assertEqual([], inv2._make_delta(inv))
280
def test_None_file_id(self):
281
inv = self.get_empty_inventory()
282
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
283
dir1.revision = 'result'
284
delta = [(None, u'dir1', None, dir1)]
285
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
288
def test_unicode_file_id(self):
289
inv = self.get_empty_inventory()
290
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
291
dir1.revision = 'result'
292
delta = [(None, u'dir1', dir1.file_id, dir1)]
293
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
296
def test_repeated_file_id(self):
297
inv = self.get_empty_inventory()
298
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
299
file1.revision = 'result'
302
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
303
file2.revision = 'result'
306
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
307
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
310
def test_repeated_new_path(self):
311
inv = self.get_empty_inventory()
312
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
313
file1.revision = 'result'
316
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
317
file2.revision = 'result'
320
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
321
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
324
def test_repeated_old_path(self):
325
inv = self.get_empty_inventory()
326
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
327
file1.revision = 'result'
330
# We can't *create* a source inventory with the same path, but
331
# a badly generated partial delta might claim the same source twice.
332
# This would be buggy in two ways: the path is repeated in the delta,
333
# And the path for one of the file ids doesn't match the source
334
# location. Alternatively, we could have a repeated fileid, but that
335
# is separately checked for.
336
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
337
file2.revision = 'result'
342
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
343
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
346
def test_mismatched_id_entry_id(self):
347
inv = self.get_empty_inventory()
348
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
349
file1.revision = 'result'
352
delta = [(None, u'path', 'id', file1)]
353
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
356
def test_mismatched_new_path_entry_None(self):
357
inv = self.get_empty_inventory()
358
delta = [(None, u'path', 'id', None)]
359
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
362
def test_mismatched_new_path_None_entry(self):
363
inv = self.get_empty_inventory()
364
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
365
file1.revision = 'result'
368
delta = [(u"path", None, 'id1', file1)]
369
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
372
def test_parent_is_not_directory(self):
373
inv = self.get_empty_inventory()
374
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
375
file1.revision = 'result'
378
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
379
file2.revision = 'result'
383
delta = [(None, u'path/path2', 'id2', file2)]
384
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
387
def test_parent_is_missing(self):
388
inv = self.get_empty_inventory()
389
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
390
file2.revision = 'result'
393
delta = [(None, u'path/path2', 'id2', file2)]
394
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
397
def test_new_parent_path_has_wrong_id(self):
398
inv = self.get_empty_inventory()
399
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
400
parent1.revision = 'result'
401
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
402
parent2.revision = 'result'
403
file1 = inventory.InventoryFile('id', 'path', 'p-2')
404
file1.revision = 'result'
409
# This delta claims that file1 is at dir/path, but actually its at
410
# dir2/path if you follow the inventory parent structure.
411
delta = [(None, u'dir/path', 'id', file1)]
412
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
415
def test_old_parent_path_is_wrong(self):
416
inv = self.get_empty_inventory()
417
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
418
parent1.revision = 'result'
419
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
420
parent2.revision = 'result'
421
file1 = inventory.InventoryFile('id', 'path', 'p-2')
422
file1.revision = 'result'
428
# This delta claims that file1 was at dir/path, but actually it was at
429
# dir2/path if you follow the inventory parent structure.
430
delta = [(u'dir/path', None, 'id', None)]
431
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
434
def test_old_parent_path_is_for_other_id(self):
435
inv = self.get_empty_inventory()
436
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
437
parent1.revision = 'result'
438
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
439
parent2.revision = 'result'
440
file1 = inventory.InventoryFile('id', 'path', 'p-2')
441
file1.revision = 'result'
444
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
445
file2.revision = 'result'
452
# This delta claims that file1 was at dir/path, but actually it was at
453
# dir2/path if you follow the inventory parent structure. At dir/path
454
# is another entry we should not delete.
455
delta = [(u'dir/path', None, 'id', None)]
456
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
459
def test_add_existing_id_new_path(self):
460
inv = self.get_empty_inventory()
461
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
462
parent1.revision = 'result'
463
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
464
parent2.revision = 'result'
466
delta = [(None, u'dir2', 'p-1', parent2)]
467
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
470
def test_add_new_id_existing_path(self):
471
inv = self.get_empty_inventory()
472
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
473
parent1.revision = 'result'
474
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
475
parent2.revision = 'result'
477
delta = [(None, u'dir1', 'p-2', parent2)]
478
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
481
def test_remove_dir_leaving_dangling_child(self):
482
inv = self.get_empty_inventory()
483
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
484
dir1.revision = 'result'
485
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
486
dir2.revision = 'result'
487
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
488
dir3.revision = 'result'
492
delta = [(u'dir1', None, 'p-1', None),
493
(u'dir1/child2', None, 'p-3', None)]
494
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
69
498
class TestInventoryEntry(TestCase):
131
560
link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
132
561
self.failIf(link.has_text())
135
class TestEntryDiffing(TestCaseWithTransport):
138
super(TestEntryDiffing, self).setUp()
139
self.wt = self.make_branch_and_tree('.')
140
self.branch = self.wt.branch
141
print >> open('file', 'wb'), 'foo'
142
self.wt.add(['file'], ['fileid'])
144
os.symlink('target1', 'symlink')
145
self.wt.add(['symlink'], ['linkid'])
146
self.wt.commit('message_1', rev_id = '1')
147
print >> open('file', 'wb'), 'bar'
150
os.symlink('target2', 'symlink')
151
self.tree_1 = self.branch.repository.revision_tree('1')
152
self.inv_1 = self.branch.repository.get_inventory('1')
153
self.file_1 = self.inv_1['fileid']
154
self.tree_2 = self.wt
155
self.inv_2 = self.tree_2.read_working_inventory()
156
self.file_2 = self.inv_2['fileid']
158
self.link_1 = self.inv_1['linkid']
159
self.link_2 = self.inv_2['linkid']
161
def test_file_diff_deleted(self):
163
self.file_1.diff(internal_diff,
164
"old_label", self.tree_1,
165
"/dev/null", None, None,
167
self.assertEqual(output.getvalue(), "--- old_label\t\n"
173
def test_file_diff_added(self):
175
self.file_1.diff(internal_diff,
176
"new_label", self.tree_1,
177
"/dev/null", None, None,
178
output, reverse=True)
179
self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
185
def test_file_diff_changed(self):
187
self.file_1.diff(internal_diff,
188
"/dev/null", self.tree_1,
189
"new_label", self.file_2, self.tree_2,
191
self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
198
def test_link_diff_deleted(self):
199
if not has_symlinks():
202
self.link_1.diff(internal_diff,
203
"old_label", self.tree_1,
204
"/dev/null", None, None,
206
self.assertEqual(output.getvalue(),
207
"=== target was 'target1'\n")
209
def test_link_diff_added(self):
210
if not has_symlinks():
213
self.link_1.diff(internal_diff,
214
"new_label", self.tree_1,
215
"/dev/null", None, None,
216
output, reverse=True)
217
self.assertEqual(output.getvalue(),
218
"=== target is 'target1'\n")
220
def test_link_diff_changed(self):
221
if not has_symlinks():
224
self.link_1.diff(internal_diff,
225
"/dev/null", self.tree_1,
226
"new_label", self.link_2, self.tree_2,
228
self.assertEqual(output.getvalue(),
229
"=== target changed 'target1' => 'target2'\n")
232
class TestSnapshot(TestCaseWithTransport):
235
# for full testing we'll need a branch
236
# with a subdir to test parent changes.
237
# and a file, link and dir under that.
238
# but right now I only need one attribute
239
# to change, and then test merge patterns
240
# with fake parent entries.
241
super(TestSnapshot, self).setUp()
242
self.wt = self.make_branch_and_tree('.')
243
self.branch = self.wt.branch
244
self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
245
self.wt.add(['subdir', 'subdir/file'],
249
self.wt.commit('message_1', rev_id = '1')
250
self.tree_1 = self.branch.repository.revision_tree('1')
251
self.inv_1 = self.branch.repository.get_inventory('1')
252
self.file_1 = self.inv_1['fileid']
253
self.file_active = self.wt.inventory['fileid']
255
def test_snapshot_new_revision(self):
256
# This tests that a simple commit with no parents makes a new
257
# revision value in the inventory entry
258
self.file_active.snapshot('2', 'subdir/file', {}, self.wt,
259
self.branch.repository.weave_store,
260
self.branch.get_transaction())
261
# expected outcome - file_1 has a revision id of '2', and we can get
262
# its text of 'file contents' out of the weave.
263
self.assertEqual(self.file_1.revision, '1')
264
self.assertEqual(self.file_active.revision, '2')
265
# this should be a separate test probably, but lets check it once..
266
lines = self.branch.repository.weave_store.get_lines('fileid','2',
267
self.branch.get_transaction())
268
self.assertEqual(lines, ['contents of subdir/file\n'])
270
def test_snapshot_unchanged(self):
271
#This tests that a simple commit does not make a new entry for
272
# an unchanged inventory entry
273
self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
275
self.branch.repository.weave_store,
276
self.branch.get_transaction())
277
self.assertEqual(self.file_1.revision, '1')
278
self.assertEqual(self.file_active.revision, '1')
279
self.assertRaises(errors.WeaveError,
280
self.branch.repository.weave_store.get_lines,
281
'fileid', '2', self.branch.get_transaction())
283
def test_snapshot_merge_identical_different_revid(self):
284
# This tests that a commit with two identical parents, one of which has
285
# a different revision id, results in a new revision id in the entry.
286
# 1->other, commit a merge of other against 1, results in 2.
287
other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
288
other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
289
other_ie.revision = '1'
290
other_ie.text_sha1 = self.file_1.text_sha1
291
other_ie.text_size = self.file_1.text_size
292
self.assertEqual(self.file_1, other_ie)
293
other_ie.revision = 'other'
294
self.assertNotEqual(self.file_1, other_ie)
295
self.branch.repository.weave_store.add_identical_text('fileid', '1',
296
'other', ['1'], self.branch.get_transaction())
297
self.file_active.snapshot('2', 'subdir/file',
298
{'1':self.file_1, 'other':other_ie},
300
self.branch.repository.weave_store,
301
self.branch.get_transaction())
302
self.assertEqual(self.file_active.revision, '2')
304
def test_snapshot_changed(self):
305
# This tests that a commit with one different parent results in a new
306
# revision id in the entry.
307
self.file_active.name='newname'
308
rename('subdir/file', 'subdir/newname')
309
self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1},
311
self.branch.repository.weave_store,
312
self.branch.get_transaction())
313
# expected outcome - file_1 has a revision id of '2'
314
self.assertEqual(self.file_active.revision, '2')
317
class TestPreviousHeads(TestCaseWithTransport):
320
# we want several inventories, that respectively
321
# give use the following scenarios:
322
# A) fileid not in any inventory (A),
323
# B) fileid present in one inventory (B) and (A,B)
324
# C) fileid present in two inventories, and they
325
# are not mutual descendents (B, C)
326
# D) fileid present in two inventories and one is
327
# a descendent of the other. (B, D)
328
super(TestPreviousHeads, self).setUp()
329
self.wt = self.make_branch_and_tree('.')
330
self.branch = self.wt.branch
331
self.build_tree(['file'])
332
self.wt.commit('new branch', allow_pointless=True, rev_id='A')
333
self.inv_A = self.branch.repository.get_inventory('A')
334
self.wt.add(['file'], ['fileid'])
335
self.wt.commit('add file', rev_id='B')
336
self.inv_B = self.branch.repository.get_inventory('B')
337
self.branch.lock_write()
563
def test_make_entry(self):
564
self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
565
inventory.InventoryFile)
566
self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
567
inventory.InventoryLink)
568
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
569
inventory.InventoryDirectory)
571
def test_make_entry_non_normalized(self):
572
orig_normalized_filename = osutils.normalized_filename
339
self.branch.control_files.put_utf8('revision-history', 'A\n')
575
osutils.normalized_filename = osutils._accessible_normalized_filename
576
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
577
self.assertEqual(u'\xe5', entry.name)
578
self.assertIsInstance(entry, inventory.InventoryFile)
580
osutils.normalized_filename = osutils._inaccessible_normalized_filename
581
self.assertRaises(errors.InvalidNormalization,
582
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
342
self.assertEqual(self.branch.revision_history(), ['A'])
343
self.wt.commit('another add of file', rev_id='C')
344
self.inv_C = self.branch.repository.get_inventory('C')
345
self.wt.add_pending_merge('B')
346
self.wt.commit('merge in B', rev_id='D')
347
self.inv_D = self.branch.repository.get_inventory('D')
348
self.file_active = self.wt.inventory['fileid']
349
self.weave = self.branch.repository.weave_store.get_weave('fileid',
350
self.branch.get_transaction())
352
def get_previous_heads(self, inventories):
353
return self.file_active.find_previous_heads(inventories, self.weave)
355
def test_fileid_in_no_inventory(self):
356
self.assertEqual({}, self.get_previous_heads([self.inv_A]))
358
def test_fileid_in_one_inventory(self):
359
self.assertEqual({'B':self.inv_B['fileid']},
360
self.get_previous_heads([self.inv_B]))
361
self.assertEqual({'B':self.inv_B['fileid']},
362
self.get_previous_heads([self.inv_A, self.inv_B]))
363
self.assertEqual({'B':self.inv_B['fileid']},
364
self.get_previous_heads([self.inv_B, self.inv_A]))
366
def test_fileid_in_two_inventories_gives_both_entries(self):
367
self.assertEqual({'B':self.inv_B['fileid'],
368
'C':self.inv_C['fileid']},
369
self.get_previous_heads([self.inv_B, self.inv_C]))
370
self.assertEqual({'B':self.inv_B['fileid'],
371
'C':self.inv_C['fileid']},
372
self.get_previous_heads([self.inv_C, self.inv_B]))
374
def test_fileid_in_two_inventories_already_merged_gives_head(self):
375
self.assertEqual({'D':self.inv_D['fileid']},
376
self.get_previous_heads([self.inv_B, self.inv_D]))
377
self.assertEqual({'D':self.inv_D['fileid']},
378
self.get_previous_heads([self.inv_D, self.inv_B]))
380
# TODO: test two inventories with the same file revision
383
class TestExecutable(TestCaseWithTransport):
385
def test_stays_executable(self):
386
basic_inv = """<inventory format="5">
387
<file file_id="a-20051208024829-849e76f7968d7a86" name="a" executable="yes" />
388
<file file_id="b-20051208024829-849e76f7968d7a86" name="b" />
391
wt = self.make_branch_and_tree('b1')
393
open('b1/a', 'wb').write('a test\n')
394
open('b1/b', 'wb').write('b test\n')
395
os.chmod('b1/a', 0755)
396
os.chmod('b1/b', 0644)
397
# Manually writing the inventory, to ensure that
398
# the executable="yes" entry is set for 'a' and not for 'b'
399
open('b1/.bzr/inventory', 'wb').write(basic_inv)
401
a_id = "a-20051208024829-849e76f7968d7a86"
402
b_id = "b-20051208024829-849e76f7968d7a86"
403
wt = wt.bzrdir.open_workingtree()
404
self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
406
self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
407
self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
409
wt.commit('adding a,b', rev_id='r1')
411
rev_tree = b.repository.revision_tree('r1')
412
self.failUnless(rev_tree.is_executable(a_id), "'a' lost the execute bit")
413
self.failIf(rev_tree.is_executable(b_id), "'b' gained an execute bit")
415
self.failUnless(rev_tree.inventory[a_id].executable)
416
self.failIf(rev_tree.inventory[b_id].executable)
418
# Make sure the entries are gone
421
self.failIf(wt.has_id(a_id))
422
self.failIf(wt.has_filename('a'))
423
self.failIf(wt.has_id(b_id))
424
self.failIf(wt.has_filename('b'))
426
# Make sure that revert is able to bring them back,
427
# and sets 'a' back to being executable
429
wt.revert(['a', 'b'], rev_tree, backups=False)
430
self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
432
self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
433
self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
435
# Now remove them again, and make sure that after a
436
# commit, they are still marked correctly
439
wt.commit('removed', rev_id='r2')
441
self.assertEqual([], [cn for cn,ie in wt.inventory.iter_entries()])
442
self.failIf(wt.has_id(a_id))
443
self.failIf(wt.has_filename('a'))
444
self.failIf(wt.has_id(b_id))
445
self.failIf(wt.has_filename('b'))
447
# Now revert back to the previous commit
448
wt.revert([], rev_tree, backups=False)
449
self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
451
self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
452
self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
454
# Now make sure that 'bzr branch' also preserves the
456
# TODO: Maybe this should be a blackbox test
457
d2 = b.bzrdir.clone('b2', revision_id='r1')
458
t2 = d2.open_workingtree()
460
self.assertEquals('r1', b2.last_revision())
462
self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
463
self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
464
self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
466
# Make sure pull will delete the files
468
self.assertEquals('r2', b2.last_revision())
469
self.assertEqual([], [cn for cn,ie in t2.inventory.iter_entries()])
471
# Now commit the changes on the first branch
472
# so that the second branch can pull the changes
473
# and make sure that the executable bit has been copied
474
wt.commit('resurrected', rev_id='r3')
477
self.assertEquals('r3', b2.last_revision())
478
self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
480
self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
481
self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
483
class TestRevert(TestCaseWithTransport):
484
def test_dangling_id(self):
485
wt = self.make_branch_and_tree('b1')
486
self.assertEqual(len(wt.inventory), 1)
487
open('b1/a', 'wb').write('a test\n')
489
self.assertEqual(len(wt.inventory), 2)
492
self.assertEqual(len(wt.inventory), 1)
584
osutils.normalized_filename = orig_normalized_filename
587
class TestDescribeChanges(TestCase):
589
def test_describe_change(self):
590
# we need to test the following change combinations:
596
# renamed/reparented and modified
597
# change kind (perhaps can't be done yet?)
598
# also, merged in combination with all of these?
599
old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
600
old_a.text_sha1 = '123132'
602
new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
603
new_a.text_sha1 = '123132'
606
self.assertChangeDescription('unchanged', old_a, new_a)
609
new_a.text_sha1 = 'abcabc'
610
self.assertChangeDescription('modified', old_a, new_a)
612
self.assertChangeDescription('added', None, new_a)
613
self.assertChangeDescription('removed', old_a, None)
614
# perhaps a bit questionable but seems like the most reasonable thing...
615
self.assertChangeDescription('unchanged', None, None)
617
# in this case it's both renamed and modified; show a rename and
619
new_a.name = 'newfilename'
620
self.assertChangeDescription('modified and renamed', old_a, new_a)
622
# reparenting is 'renaming'
623
new_a.name = old_a.name
624
new_a.parent_id = 'somedir-id'
625
self.assertChangeDescription('modified and renamed', old_a, new_a)
627
# reset the content values so its not modified
628
new_a.text_size = old_a.text_size
629
new_a.text_sha1 = old_a.text_sha1
630
new_a.name = old_a.name
632
new_a.name = 'newfilename'
633
self.assertChangeDescription('renamed', old_a, new_a)
635
# reparenting is 'renaming'
636
new_a.name = old_a.name
637
new_a.parent_id = 'somedir-id'
638
self.assertChangeDescription('renamed', old_a, new_a)
640
def assertChangeDescription(self, expected_change, old_ie, new_ie):
641
change = InventoryEntry.describe_change(old_ie, new_ie)
642
self.assertEqual(expected_change, change)
645
class TestCHKInventory(TestCaseWithTransport):
647
def get_chk_bytes(self):
648
# The easiest way to get a CHK store is a development6 repository and
649
# then work with the chk_bytes attribute directly.
650
repo = self.make_repository(".", format="development6-rich-root")
652
self.addCleanup(repo.unlock)
653
repo.start_write_group()
654
self.addCleanup(repo.abort_write_group)
655
return repo.chk_bytes
657
def read_bytes(self, chk_bytes, key):
658
stream = chk_bytes.get_record_stream([key], 'unordered', True)
659
return stream.next().get_bytes_as("fulltext")
661
def test_deserialise_gives_CHKInventory(self):
663
inv.revision_id = "revid"
664
inv.root.revision = "rootrev"
665
chk_bytes = self.get_chk_bytes()
666
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
667
bytes = ''.join(chk_inv.to_lines())
668
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
669
self.assertEqual("revid", new_inv.revision_id)
670
self.assertEqual("directory", new_inv.root.kind)
671
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
672
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
673
self.assertEqual(inv.root.name, new_inv.root.name)
674
self.assertEqual("rootrev", new_inv.root.revision)
675
self.assertEqual('plain', new_inv._search_key_name)
677
def test_deserialise_wrong_revid(self):
679
inv.revision_id = "revid"
680
inv.root.revision = "rootrev"
681
chk_bytes = self.get_chk_bytes()
682
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
683
bytes = ''.join(chk_inv.to_lines())
684
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
687
def test_captures_rev_root_byid(self):
689
inv.revision_id = "foo"
690
inv.root.revision = "bar"
691
chk_bytes = self.get_chk_bytes()
692
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
693
lines = chk_inv.to_lines()
696
'revision_id: foo\n',
697
'root_id: TREE_ROOT\n',
698
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
699
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
701
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
702
self.assertEqual('plain', chk_inv._search_key_name)
704
def test_captures_parent_id_basename_index(self):
706
inv.revision_id = "foo"
707
inv.root.revision = "bar"
708
chk_bytes = self.get_chk_bytes()
709
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
710
lines = chk_inv.to_lines()
713
'revision_id: foo\n',
714
'root_id: TREE_ROOT\n',
715
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
716
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
718
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
719
self.assertEqual('plain', chk_inv._search_key_name)
721
def test_captures_search_key_name(self):
723
inv.revision_id = "foo"
724
inv.root.revision = "bar"
725
chk_bytes = self.get_chk_bytes()
726
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
727
search_key_name='hash-16-way')
728
lines = chk_inv.to_lines()
731
'search_key_name: hash-16-way\n',
732
'root_id: TREE_ROOT\n',
733
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
734
'revision_id: foo\n',
735
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
737
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
738
self.assertEqual('hash-16-way', chk_inv._search_key_name)
740
def test_directory_children_on_demand(self):
742
inv.revision_id = "revid"
743
inv.root.revision = "rootrev"
744
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
745
inv["fileid"].revision = "filerev"
746
inv["fileid"].executable = True
747
inv["fileid"].text_sha1 = "ffff"
748
inv["fileid"].text_size = 1
749
chk_bytes = self.get_chk_bytes()
750
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
751
bytes = ''.join(chk_inv.to_lines())
752
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
753
root_entry = new_inv[inv.root.file_id]
754
self.assertEqual(None, root_entry._children)
755
self.assertEqual(['file'], root_entry.children.keys())
756
file_direct = new_inv["fileid"]
757
file_found = root_entry.children['file']
758
self.assertEqual(file_direct.kind, file_found.kind)
759
self.assertEqual(file_direct.file_id, file_found.file_id)
760
self.assertEqual(file_direct.parent_id, file_found.parent_id)
761
self.assertEqual(file_direct.name, file_found.name)
762
self.assertEqual(file_direct.revision, file_found.revision)
763
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
764
self.assertEqual(file_direct.text_size, file_found.text_size)
765
self.assertEqual(file_direct.executable, file_found.executable)
767
def test_from_inventory_maximum_size(self):
768
# from_inventory supports the maximum_size parameter.
770
inv.revision_id = "revid"
771
inv.root.revision = "rootrev"
772
chk_bytes = self.get_chk_bytes()
773
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
774
chk_inv.id_to_entry._ensure_root()
775
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
776
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
777
p_id_basename = chk_inv.parent_id_basename_to_file_id
778
p_id_basename._ensure_root()
779
self.assertEqual(120, p_id_basename._root_node.maximum_size)
780
self.assertEqual(2, p_id_basename._root_node._key_width)
782
def test___iter__(self):
784
inv.revision_id = "revid"
785
inv.root.revision = "rootrev"
786
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
787
inv["fileid"].revision = "filerev"
788
inv["fileid"].executable = True
789
inv["fileid"].text_sha1 = "ffff"
790
inv["fileid"].text_size = 1
791
chk_bytes = self.get_chk_bytes()
792
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
793
bytes = ''.join(chk_inv.to_lines())
794
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
795
fileids = list(new_inv.__iter__())
797
self.assertEqual([inv.root.file_id, "fileid"], fileids)
799
def test__len__(self):
801
inv.revision_id = "revid"
802
inv.root.revision = "rootrev"
803
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
804
inv["fileid"].revision = "filerev"
805
inv["fileid"].executable = True
806
inv["fileid"].text_sha1 = "ffff"
807
inv["fileid"].text_size = 1
808
chk_bytes = self.get_chk_bytes()
809
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
810
self.assertEqual(2, len(chk_inv))
812
def test___getitem__(self):
814
inv.revision_id = "revid"
815
inv.root.revision = "rootrev"
816
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
817
inv["fileid"].revision = "filerev"
818
inv["fileid"].executable = True
819
inv["fileid"].text_sha1 = "ffff"
820
inv["fileid"].text_size = 1
821
chk_bytes = self.get_chk_bytes()
822
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
823
bytes = ''.join(chk_inv.to_lines())
824
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
825
root_entry = new_inv[inv.root.file_id]
826
file_entry = new_inv["fileid"]
827
self.assertEqual("directory", root_entry.kind)
828
self.assertEqual(inv.root.file_id, root_entry.file_id)
829
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
830
self.assertEqual(inv.root.name, root_entry.name)
831
self.assertEqual("rootrev", root_entry.revision)
832
self.assertEqual("file", file_entry.kind)
833
self.assertEqual("fileid", file_entry.file_id)
834
self.assertEqual(inv.root.file_id, file_entry.parent_id)
835
self.assertEqual("file", file_entry.name)
836
self.assertEqual("filerev", file_entry.revision)
837
self.assertEqual("ffff", file_entry.text_sha1)
838
self.assertEqual(1, file_entry.text_size)
839
self.assertEqual(True, file_entry.executable)
840
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
842
def test_has_id_true(self):
844
inv.revision_id = "revid"
845
inv.root.revision = "rootrev"
846
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
847
inv["fileid"].revision = "filerev"
848
inv["fileid"].executable = True
849
inv["fileid"].text_sha1 = "ffff"
850
inv["fileid"].text_size = 1
851
chk_bytes = self.get_chk_bytes()
852
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
853
self.assertTrue(chk_inv.has_id('fileid'))
854
self.assertTrue(chk_inv.has_id(inv.root.file_id))
856
def test_has_id_not(self):
858
inv.revision_id = "revid"
859
inv.root.revision = "rootrev"
860
chk_bytes = self.get_chk_bytes()
861
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
862
self.assertFalse(chk_inv.has_id('fileid'))
864
def test_id2path(self):
866
inv.revision_id = "revid"
867
inv.root.revision = "rootrev"
868
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
869
fileentry = InventoryFile("fileid", "file", "dirid")
872
inv["fileid"].revision = "filerev"
873
inv["fileid"].executable = True
874
inv["fileid"].text_sha1 = "ffff"
875
inv["fileid"].text_size = 1
876
inv["dirid"].revision = "filerev"
877
chk_bytes = self.get_chk_bytes()
878
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
879
bytes = ''.join(chk_inv.to_lines())
880
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
881
self.assertEqual('', new_inv.id2path(inv.root.file_id))
882
self.assertEqual('dir', new_inv.id2path('dirid'))
883
self.assertEqual('dir/file', new_inv.id2path('fileid'))
885
def test_path2id(self):
887
inv.revision_id = "revid"
888
inv.root.revision = "rootrev"
889
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
890
fileentry = InventoryFile("fileid", "file", "dirid")
893
inv["fileid"].revision = "filerev"
894
inv["fileid"].executable = True
895
inv["fileid"].text_sha1 = "ffff"
896
inv["fileid"].text_size = 1
897
inv["dirid"].revision = "filerev"
898
chk_bytes = self.get_chk_bytes()
899
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
900
bytes = ''.join(chk_inv.to_lines())
901
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
902
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
903
self.assertEqual('dirid', new_inv.path2id('dir'))
904
self.assertEqual('fileid', new_inv.path2id('dir/file'))
906
def test_create_by_apply_delta_sets_root(self):
908
inv.revision_id = "revid"
909
chk_bytes = self.get_chk_bytes()
910
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
911
inv.add_path("", "directory", "myrootid", None)
912
inv.revision_id = "expectedid"
913
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
914
delta = [("", None, base_inv.root.file_id, None),
915
(None, "", "myrootid", inv.root)]
916
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
917
self.assertEquals(reference_inv.root, new_inv.root)
919
def test_create_by_apply_delta_empty_add_child(self):
921
inv.revision_id = "revid"
922
inv.root.revision = "rootrev"
923
chk_bytes = self.get_chk_bytes()
924
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
925
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
926
a_entry.revision = "filerev"
927
a_entry.executable = True
928
a_entry.text_sha1 = "ffff"
929
a_entry.text_size = 1
931
inv.revision_id = "expectedid"
932
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
933
delta = [(None, "A", "A-id", a_entry)]
934
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
935
# new_inv should be the same as reference_inv.
936
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
937
self.assertEqual(reference_inv.root_id, new_inv.root_id)
938
reference_inv.id_to_entry._ensure_root()
939
new_inv.id_to_entry._ensure_root()
940
self.assertEqual(reference_inv.id_to_entry._root_node._key,
941
new_inv.id_to_entry._root_node._key)
943
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
945
inv.revision_id = "revid"
946
inv.root.revision = "rootrev"
947
chk_bytes = self.get_chk_bytes()
948
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
949
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
950
a_entry.revision = "filerev"
951
a_entry.executable = True
952
a_entry.text_sha1 = "ffff"
953
a_entry.text_size = 1
955
inv.revision_id = "expectedid"
956
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
957
delta = [(None, "A", "A-id", a_entry)]
958
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
959
reference_inv.id_to_entry._ensure_root()
960
reference_inv.parent_id_basename_to_file_id._ensure_root()
961
new_inv.id_to_entry._ensure_root()
962
new_inv.parent_id_basename_to_file_id._ensure_root()
963
# new_inv should be the same as reference_inv.
964
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
965
self.assertEqual(reference_inv.root_id, new_inv.root_id)
966
self.assertEqual(reference_inv.id_to_entry._root_node._key,
967
new_inv.id_to_entry._root_node._key)
968
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
969
new_inv.parent_id_basename_to_file_id._root_node._key)
971
def test_iter_changes(self):
972
# Low level bootstrapping smoke test; comprehensive generic tests via
973
# InterTree are coming.
975
inv.revision_id = "revid"
976
inv.root.revision = "rootrev"
977
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
978
inv["fileid"].revision = "filerev"
979
inv["fileid"].executable = True
980
inv["fileid"].text_sha1 = "ffff"
981
inv["fileid"].text_size = 1
983
inv2.revision_id = "revid2"
984
inv2.root.revision = "rootrev"
985
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
986
inv2["fileid"].revision = "filerev2"
987
inv2["fileid"].executable = False
988
inv2["fileid"].text_sha1 = "bbbb"
989
inv2["fileid"].text_size = 2
991
chk_bytes = self.get_chk_bytes()
992
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
993
bytes = ''.join(chk_inv.to_lines())
994
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
995
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
996
bytes = ''.join(chk_inv2.to_lines())
997
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
998
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
999
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1001
list(inv_1.iter_changes(inv_2)))
1003
def test_parent_id_basename_to_file_id_index_enabled(self):
1005
inv.revision_id = "revid"
1006
inv.root.revision = "rootrev"
1007
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1008
inv["fileid"].revision = "filerev"
1009
inv["fileid"].executable = True
1010
inv["fileid"].text_sha1 = "ffff"
1011
inv["fileid"].text_size = 1
1012
# get fresh objects.
1013
chk_bytes = self.get_chk_bytes()
1014
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1015
bytes = ''.join(tmp_inv.to_lines())
1016
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1017
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1019
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1020
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1022
def test_file_entry_to_bytes(self):
1023
inv = CHKInventory(None)
1024
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1025
ie.executable = True
1026
ie.revision = 'file-rev-id'
1027
ie.text_sha1 = 'abcdefgh'
1029
bytes = inv._entry_to_bytes(ie)
1030
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1031
'file-rev-id\nabcdefgh\n100\nY', bytes)
1032
ie2 = inv._bytes_to_entry(bytes)
1033
self.assertEqual(ie, ie2)
1034
self.assertIsInstance(ie2.name, unicode)
1035
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1036
inv._bytes_to_utf8name_key(bytes))
1038
def test_file2_entry_to_bytes(self):
1039
inv = CHKInventory(None)
1041
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1042
ie.executable = False
1043
ie.revision = 'file-rev-id'
1044
ie.text_sha1 = '123456'
1046
bytes = inv._entry_to_bytes(ie)
1047
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1048
'file-rev-id\n123456\n25\nN', bytes)
1049
ie2 = inv._bytes_to_entry(bytes)
1050
self.assertEqual(ie, ie2)
1051
self.assertIsInstance(ie2.name, unicode)
1052
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1053
inv._bytes_to_utf8name_key(bytes))
1055
def test_dir_entry_to_bytes(self):
1056
inv = CHKInventory(None)
1057
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1058
ie.revision = 'dir-rev-id'
1059
bytes = inv._entry_to_bytes(ie)
1060
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1061
ie2 = inv._bytes_to_entry(bytes)
1062
self.assertEqual(ie, ie2)
1063
self.assertIsInstance(ie2.name, unicode)
1064
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1065
inv._bytes_to_utf8name_key(bytes))
1067
def test_dir2_entry_to_bytes(self):
1068
inv = CHKInventory(None)
1069
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1071
ie.revision = 'dir-rev-id'
1072
bytes = inv._entry_to_bytes(ie)
1073
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1074
'dir-rev-id', bytes)
1075
ie2 = inv._bytes_to_entry(bytes)
1076
self.assertEqual(ie, ie2)
1077
self.assertIsInstance(ie2.name, unicode)
1078
self.assertIs(ie2.parent_id, None)
1079
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1080
inv._bytes_to_utf8name_key(bytes))
1082
def test_symlink_entry_to_bytes(self):
1083
inv = CHKInventory(None)
1084
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1085
ie.revision = 'link-rev-id'
1086
ie.symlink_target = u'target/path'
1087
bytes = inv._entry_to_bytes(ie)
1088
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1089
'link-rev-id\ntarget/path', bytes)
1090
ie2 = inv._bytes_to_entry(bytes)
1091
self.assertEqual(ie, ie2)
1092
self.assertIsInstance(ie2.name, unicode)
1093
self.assertIsInstance(ie2.symlink_target, unicode)
1094
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1095
inv._bytes_to_utf8name_key(bytes))
1097
def test_symlink2_entry_to_bytes(self):
1098
inv = CHKInventory(None)
1099
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1100
ie.revision = 'link-rev-id'
1101
ie.symlink_target = u'target/\u03a9path'
1102
bytes = inv._entry_to_bytes(ie)
1103
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1104
'link-rev-id\ntarget/\xce\xa9path', bytes)
1105
ie2 = inv._bytes_to_entry(bytes)
1106
self.assertEqual(ie, ie2)
1107
self.assertIsInstance(ie2.name, unicode)
1108
self.assertIsInstance(ie2.symlink_target, unicode)
1109
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1110
inv._bytes_to_utf8name_key(bytes))
1112
def test_tree_reference_entry_to_bytes(self):
1113
inv = CHKInventory(None)
1114
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1116
ie.revision = 'tree-rev-id'
1117
ie.reference_revision = 'ref-rev-id'
1118
bytes = inv._entry_to_bytes(ie)
1119
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1120
'tree-rev-id\nref-rev-id', bytes)
1121
ie2 = inv._bytes_to_entry(bytes)
1122
self.assertEqual(ie, ie2)
1123
self.assertIsInstance(ie2.name, unicode)
1124
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1125
inv._bytes_to_utf8name_key(bytes))