1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
1
# Copyright (C) 2005 by Canonical Ltd
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
11
# GNU General Public License for more details.
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
27
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
28
InventoryDirectory, InventoryEntry, TreeReference)
29
from bzrlib.tests import (
31
TestCaseWithTransport,
34
split_suite_by_condition,
36
from bzrlib.tests.per_workingtree import workingtree_formats
39
def load_tests(standard_tests, module, loader):
40
"""Parameterise some inventory tests."""
41
to_adapt, result = split_suite_by_condition(standard_tests,
42
condition_isinstance(TestDeltaApplication))
44
('Inventory', {'apply_delta':apply_inventory_Inventory}),
46
# Working tree basis delta application
47
# Repository add_inv_by_delta.
48
# Reduce form of the per_repository test logic - that logic needs to be
49
# be able to get /just/ repositories whereas these tests are fine with
50
# just creating trees.
52
for _, format in repository.format_registry.iteritems():
53
scenarios.append((str(format.__name__), {
54
'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
56
for format in workingtree_formats():
58
(str(format.__class__.__name__) + ".update_basis_by_delta", {
59
'apply_delta':apply_inventory_WT_basis,
62
(str(format.__class__.__name__) + ".apply_inventory_delta", {
63
'apply_delta':apply_inventory_WT,
65
return multiply_tests(to_adapt, scenarios, result)
68
def apply_inventory_Inventory(self, basis, delta):
69
"""Apply delta to basis and return the result.
71
:param basis: An inventory to be used as the basis.
72
:param delta: The inventory delta to apply:
73
:return: An inventory resulting from the application.
75
basis.apply_delta(delta)
79
def apply_inventory_WT(self, basis, delta):
80
"""Apply delta to basis and return the result.
82
This sets the tree state to be basis, and then calls apply_inventory_delta.
84
:param basis: An inventory to be used as the basis.
85
:param delta: The inventory delta to apply:
86
:return: An inventory resulting from the application.
88
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
89
control.create_repository()
90
control.create_branch()
91
tree = self.format.initialize(control)
94
tree._write_inventory(basis)
97
# Fresh object, reads disk again.
98
tree = tree.bzrdir.open_workingtree()
101
tree.apply_inventory_delta(delta)
104
# reload tree - ensure we get what was written.
105
tree = tree.bzrdir.open_workingtree()
107
self.addCleanup(tree.unlock)
108
# One could add 'tree._validate' here but that would cause 'early' failues
109
# as far as higher level code is concerned. Possibly adding an
110
# expect_fail parameter to this function and if that is False then do a
112
return tree.inventory
115
def apply_inventory_WT_basis(self, basis, delta):
116
"""Apply delta to basis and return the result.
118
This sets the parent and then calls update_basis_by_delta.
119
It also puts the basis in the repository under both 'basis' and 'result' to
120
allow safety checks made by the WT to succeed, and finally ensures that all
121
items in the delta with a new path are present in the WT before calling
122
update_basis_by_delta.
124
:param basis: An inventory to be used as the basis.
125
:param delta: The inventory delta to apply:
126
:return: An inventory resulting from the application.
128
control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
129
control.create_repository()
130
control.create_branch()
131
tree = self.format.initialize(control)
134
repo = tree.branch.repository
135
repo.start_write_group()
137
rev = revision.Revision('basis', timestamp=0, timezone=None,
138
message="", committer="foo@example.com")
139
basis.revision_id = 'basis'
140
repo.add_revision('basis', rev, basis)
141
# Add a revision for the result, with the basis content -
142
# update_basis_by_delta doesn't check that the delta results in
143
# result, and we want inconsistent deltas to get called on the
144
# tree, or else the code isn't actually checked.
145
rev = revision.Revision('result', timestamp=0, timezone=None,
146
message="", committer="foo@example.com")
147
basis.revision_id = 'result'
148
repo.add_revision('result', rev, basis)
150
repo.abort_write_group()
153
repo.commit_write_group()
154
# Set the basis state as the trees current state
155
tree._write_inventory(basis)
156
# This reads basis from the repo and puts it into the tree's local
157
# cache, if it has one.
158
tree.set_parent_ids(['basis'])
161
for old, new, id, entry in delta:
162
if None in (new, entry):
164
paths[new] = (entry.file_id, entry.kind)
165
parents.add(osutils.dirname(new))
166
parents = osutils.minimum_path_selection(parents)
168
# Put place holders in the tree to permit adding the other entries.
169
for pos, parent in enumerate(parents):
170
if not tree.path2id(parent):
171
# add a synthetic directory in the tree so we can can put the
172
# tree0 entries in place for dirstate.
173
tree.add([parent], ["id%d" % pos], ["directory"])
175
# Many deltas may cause this mini-apply to fail, but we want to see what
176
# the delta application code says, not the prep that we do to deal with
177
# limitations of dirstate's update_basis code.
178
for path, (file_id, kind) in sorted(paths.items()):
180
tree.add([path], [file_id], [kind])
181
except (KeyboardInterrupt, SystemExit):
187
# Fresh lock, reads disk again.
190
tree.update_basis_by_delta('result', delta)
193
# reload tree - ensure we get what was written.
194
tree = tree.bzrdir.open_workingtree()
195
basis_tree = tree.basis_tree()
196
basis_tree.lock_read()
197
self.addCleanup(basis_tree.unlock)
198
# Note, that if the tree does not have a local cache, the trick above of
199
# setting the result as the basis, will come back to bite us. That said,
200
# all the implementations in bzr do have a local cache.
201
return basis_tree.inventory
204
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
205
"""Apply delta to basis and return the result.
207
This inserts basis as a whole inventory and then uses
208
add_inventory_by_delta to add delta.
210
:param basis: An inventory to be used as the basis.
211
:param delta: The inventory delta to apply:
212
:return: An inventory resulting from the application.
214
format = self.format()
215
control = self.make_bzrdir('tree', format=format._matchingbzrdir)
216
repo = format.initialize(control)
219
repo.start_write_group()
221
rev = revision.Revision('basis', timestamp=0, timezone=None,
222
message="", committer="foo@example.com")
223
basis.revision_id = 'basis'
224
repo.add_revision('basis', rev, basis)
226
repo.abort_write_group()
229
repo.commit_write_group()
234
repo.start_write_group()
236
inv_sha1 = repo.add_inventory_by_delta('basis', delta,
239
repo.abort_write_group()
242
repo.commit_write_group()
245
# Fresh lock, reads disk again.
246
repo = repo.bzrdir.open_repository()
248
self.addCleanup(repo.unlock)
249
return repo.get_inventory('result')
252
class TestDeltaApplication(TestCaseWithTransport):
254
def get_empty_inventory(self, reference_inv=None):
255
"""Get an empty inventory.
257
Note that tests should not depend on the revision of the root for
258
setting up test conditions, as it has to be flexible to accomodate non
259
rich root repositories.
261
:param reference_inv: If not None, get the revision for the root from
262
this inventory. This is useful for dealing with older repositories
263
that routinely discarded the root entry data. If None, the root's
264
revision is set to 'basis'.
266
inv = inventory.Inventory()
267
if reference_inv is not None:
268
inv.root.revision = reference_inv.root.revision
270
inv.root.revision = 'basis'
273
def test_empty_delta(self):
274
inv = self.get_empty_inventory()
276
inv = self.apply_delta(self, inv, delta)
277
inv2 = self.get_empty_inventory(inv)
278
self.assertEqual([], inv2._make_delta(inv))
280
def test_None_file_id(self):
281
inv = self.get_empty_inventory()
282
dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
283
dir1.revision = 'result'
284
delta = [(None, u'dir1', None, dir1)]
285
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
288
def test_unicode_file_id(self):
289
inv = self.get_empty_inventory()
290
dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
291
dir1.revision = 'result'
292
delta = [(None, u'dir1', dir1.file_id, dir1)]
293
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
296
def test_repeated_file_id(self):
297
inv = self.get_empty_inventory()
298
file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
299
file1.revision = 'result'
302
file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
303
file2.revision = 'result'
306
delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
307
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
310
def test_repeated_new_path(self):
311
inv = self.get_empty_inventory()
312
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
313
file1.revision = 'result'
316
file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
317
file2.revision = 'result'
320
delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
321
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
324
def test_repeated_old_path(self):
325
inv = self.get_empty_inventory()
326
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
327
file1.revision = 'result'
330
# We can't *create* a source inventory with the same path, but
331
# a badly generated partial delta might claim the same source twice.
332
# This would be buggy in two ways: the path is repeated in the delta,
333
# And the path for one of the file ids doesn't match the source
334
# location. Alternatively, we could have a repeated fileid, but that
335
# is separately checked for.
336
file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
337
file2.revision = 'result'
342
delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
343
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
346
def test_mismatched_id_entry_id(self):
347
inv = self.get_empty_inventory()
348
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
349
file1.revision = 'result'
352
delta = [(None, u'path', 'id', file1)]
353
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
356
def test_mismatched_new_path_entry_None(self):
357
inv = self.get_empty_inventory()
358
delta = [(None, u'path', 'id', None)]
359
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
362
def test_mismatched_new_path_None_entry(self):
363
inv = self.get_empty_inventory()
364
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
365
file1.revision = 'result'
368
delta = [(u"path", None, 'id1', file1)]
369
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
372
def test_parent_is_not_directory(self):
373
inv = self.get_empty_inventory()
374
file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
375
file1.revision = 'result'
378
file2 = inventory.InventoryFile('id2', 'path2', 'id1')
379
file2.revision = 'result'
383
delta = [(None, u'path/path2', 'id2', file2)]
384
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
387
def test_parent_is_missing(self):
388
inv = self.get_empty_inventory()
389
file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
390
file2.revision = 'result'
393
delta = [(None, u'path/path2', 'id2', file2)]
394
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
397
def test_new_parent_path_has_wrong_id(self):
398
inv = self.get_empty_inventory()
399
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
400
parent1.revision = 'result'
401
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
402
parent2.revision = 'result'
403
file1 = inventory.InventoryFile('id', 'path', 'p-2')
404
file1.revision = 'result'
409
# This delta claims that file1 is at dir/path, but actually its at
410
# dir2/path if you follow the inventory parent structure.
411
delta = [(None, u'dir/path', 'id', file1)]
412
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
415
def test_old_parent_path_is_wrong(self):
416
inv = self.get_empty_inventory()
417
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
418
parent1.revision = 'result'
419
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
420
parent2.revision = 'result'
421
file1 = inventory.InventoryFile('id', 'path', 'p-2')
422
file1.revision = 'result'
428
# This delta claims that file1 was at dir/path, but actually it was at
429
# dir2/path if you follow the inventory parent structure.
430
delta = [(u'dir/path', None, 'id', None)]
431
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
434
def test_old_parent_path_is_for_other_id(self):
435
inv = self.get_empty_inventory()
436
parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
437
parent1.revision = 'result'
438
parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
439
parent2.revision = 'result'
440
file1 = inventory.InventoryFile('id', 'path', 'p-2')
441
file1.revision = 'result'
444
file2 = inventory.InventoryFile('id2', 'path', 'p-1')
445
file2.revision = 'result'
452
# This delta claims that file1 was at dir/path, but actually it was at
453
# dir2/path if you follow the inventory parent structure. At dir/path
454
# is another entry we should not delete.
455
delta = [(u'dir/path', None, 'id', None)]
456
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
459
def test_add_existing_id_new_path(self):
460
inv = self.get_empty_inventory()
461
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
462
parent1.revision = 'result'
463
parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
464
parent2.revision = 'result'
466
delta = [(None, u'dir2', 'p-1', parent2)]
467
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
470
def test_add_new_id_existing_path(self):
471
inv = self.get_empty_inventory()
472
parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
473
parent1.revision = 'result'
474
parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
475
parent2.revision = 'result'
477
delta = [(None, u'dir1', 'p-2', parent2)]
478
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
481
def test_remove_dir_leaving_dangling_child(self):
482
inv = self.get_empty_inventory()
483
dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
484
dir1.revision = 'result'
485
dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
486
dir2.revision = 'result'
487
dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
488
dir3.revision = 'result'
492
delta = [(u'dir1', None, 'p-1', None),
493
(u'dir1/child2', None, 'p-3', None)]
494
self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
from cStringIO import StringIO
20
from bzrlib.branch import Branch
21
import bzrlib.errors as errors
22
from bzrlib.diff import internal_diff
23
from bzrlib.inventory import (Inventory, ROOT_ID, InventoryFile,
24
InventoryDirectory, InventoryEntry)
25
import bzrlib.inventory as inventory
26
from bzrlib.osutils import has_symlinks, rename, pathjoin
27
from bzrlib.tests import TestCase, TestCaseWithTransport
28
from bzrlib.transform import TreeTransform
29
from bzrlib.uncommit import uncommit
32
class TestInventory(TestCase):
34
def test_is_within(self):
35
from bzrlib.osutils import is_inside_any
37
SRC_FOO_C = pathjoin('src', 'foo.c')
38
for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
42
self.assert_(is_inside_any(dirs, fn))
44
for dirs, fn in [(['src'], 'srccontrol'),
45
(['src'], 'srccontrol/foo')]:
46
self.assertFalse(is_inside_any(dirs, fn))
49
"""Test detection of files within selected directories."""
52
for args in [('src', 'directory', 'src-id'),
53
('doc', 'directory', 'doc-id'),
54
('src/hello.c', 'file'),
55
('src/bye.c', 'file', 'bye-id'),
56
('Makefile', 'file')]:
59
self.assertEqual(inv.path2id('src'), 'src-id')
60
self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
62
self.assert_('src-id' in inv)
64
def test_iter_entries(self):
67
for args in [('src', 'directory', 'src-id'),
68
('doc', 'directory', 'doc-id'),
69
('src/hello.c', 'file', 'hello-id'),
70
('src/bye.c', 'file', 'bye-id'),
71
('Makefile', 'file', 'makefile-id')]:
75
('Makefile', 'makefile-id'),
78
('src/bye.c', 'bye-id'),
79
('src/hello.c', 'hello-id'),
80
], [(path, ie.file_id) for path, ie in inv.iter_entries()])
82
def test_version(self):
83
"""Inventory remembers the text's version."""
85
ie = inv.add_path('foo.txt', 'file')
498
89
class TestInventoryEntry(TestCase):
568
159
self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
569
160
inventory.InventoryDirectory)
571
def test_make_entry_non_normalized(self):
572
orig_normalized_filename = osutils.normalized_filename
575
osutils.normalized_filename = osutils._accessible_normalized_filename
576
entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
577
self.assertEqual(u'\xe5', entry.name)
578
self.assertIsInstance(entry, inventory.InventoryFile)
580
osutils.normalized_filename = osutils._inaccessible_normalized_filename
581
self.assertRaises(errors.InvalidNormalization,
582
inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
584
osutils.normalized_filename = orig_normalized_filename
162
class TestEntryDiffing(TestCaseWithTransport):
165
super(TestEntryDiffing, self).setUp()
166
self.wt = self.make_branch_and_tree('.')
167
self.branch = self.wt.branch
168
print >> open('file', 'wb'), 'foo'
169
print >> open('binfile', 'wb'), 'foo'
170
self.wt.add(['file'], ['fileid'])
171
self.wt.add(['binfile'], ['binfileid'])
173
os.symlink('target1', 'symlink')
174
self.wt.add(['symlink'], ['linkid'])
175
self.wt.commit('message_1', rev_id = '1')
176
print >> open('file', 'wb'), 'bar'
177
print >> open('binfile', 'wb'), 'x' * 1023 + '\x00'
180
os.symlink('target2', 'symlink')
181
self.tree_1 = self.branch.repository.revision_tree('1')
182
self.inv_1 = self.branch.repository.get_inventory('1')
183
self.file_1 = self.inv_1['fileid']
184
self.file_1b = self.inv_1['binfileid']
185
self.tree_2 = self.wt
186
self.inv_2 = self.tree_2.read_working_inventory()
187
self.file_2 = self.inv_2['fileid']
188
self.file_2b = self.inv_2['binfileid']
190
self.link_1 = self.inv_1['linkid']
191
self.link_2 = self.inv_2['linkid']
193
def test_file_diff_deleted(self):
195
self.file_1.diff(internal_diff,
196
"old_label", self.tree_1,
197
"/dev/null", None, None,
199
self.assertEqual(output.getvalue(), "--- old_label\n"
205
def test_file_diff_added(self):
207
self.file_1.diff(internal_diff,
208
"new_label", self.tree_1,
209
"/dev/null", None, None,
210
output, reverse=True)
211
self.assertEqual(output.getvalue(), "--- /dev/null\n"
217
def test_file_diff_changed(self):
219
self.file_1.diff(internal_diff,
220
"/dev/null", self.tree_1,
221
"new_label", self.file_2, self.tree_2,
223
self.assertEqual(output.getvalue(), "--- /dev/null\n"
230
def test_file_diff_binary(self):
232
self.file_1.diff(internal_diff,
233
"/dev/null", self.tree_1,
234
"new_label", self.file_2b, self.tree_2,
236
self.assertEqual(output.getvalue(),
237
"Binary files /dev/null and new_label differ\n")
238
def test_link_diff_deleted(self):
239
if not has_symlinks():
242
self.link_1.diff(internal_diff,
243
"old_label", self.tree_1,
244
"/dev/null", None, None,
246
self.assertEqual(output.getvalue(),
247
"=== target was 'target1'\n")
249
def test_link_diff_added(self):
250
if not has_symlinks():
253
self.link_1.diff(internal_diff,
254
"new_label", self.tree_1,
255
"/dev/null", None, None,
256
output, reverse=True)
257
self.assertEqual(output.getvalue(),
258
"=== target is 'target1'\n")
260
def test_link_diff_changed(self):
261
if not has_symlinks():
264
self.link_1.diff(internal_diff,
265
"/dev/null", self.tree_1,
266
"new_label", self.link_2, self.tree_2,
268
self.assertEqual(output.getvalue(),
269
"=== target changed 'target1' => 'target2'\n")
272
class TestSnapshot(TestCaseWithTransport):
275
# for full testing we'll need a branch
276
# with a subdir to test parent changes.
277
# and a file, link and dir under that.
278
# but right now I only need one attribute
279
# to change, and then test merge patterns
280
# with fake parent entries.
281
super(TestSnapshot, self).setUp()
282
self.wt = self.make_branch_and_tree('.')
283
self.branch = self.wt.branch
284
self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
285
self.wt.add(['subdir', 'subdir/file'],
289
self.wt.commit('message_1', rev_id = '1')
290
self.tree_1 = self.branch.repository.revision_tree('1')
291
self.inv_1 = self.branch.repository.get_inventory('1')
292
self.file_1 = self.inv_1['fileid']
293
self.file_active = self.wt.inventory['fileid']
295
def test_snapshot_new_revision(self):
296
# This tests that a simple commit with no parents makes a new
297
# revision value in the inventory entry
298
self.file_active.snapshot('2', 'subdir/file', {}, self.wt,
299
self.branch.repository.weave_store,
300
self.branch.get_transaction())
301
# expected outcome - file_1 has a revision id of '2', and we can get
302
# its text of 'file contents' out of the weave.
303
self.assertEqual(self.file_1.revision, '1')
304
self.assertEqual(self.file_active.revision, '2')
305
# this should be a separate test probably, but lets check it once..
306
lines = self.branch.repository.weave_store.get_weave(
308
self.branch.get_transaction()).get_lines('2')
309
self.assertEqual(lines, ['contents of subdir/file\n'])
311
def test_snapshot_unchanged(self):
312
#This tests that a simple commit does not make a new entry for
313
# an unchanged inventory entry
314
self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
316
self.branch.repository.weave_store,
317
self.branch.get_transaction())
318
self.assertEqual(self.file_1.revision, '1')
319
self.assertEqual(self.file_active.revision, '1')
320
vf = self.branch.repository.weave_store.get_weave(
322
self.branch.repository.get_transaction())
323
self.assertRaises(errors.RevisionNotPresent,
327
def test_snapshot_merge_identical_different_revid(self):
328
# This tests that a commit with two identical parents, one of which has
329
# a different revision id, results in a new revision id in the entry.
330
# 1->other, commit a merge of other against 1, results in 2.
331
other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
332
other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
333
other_ie.revision = '1'
334
other_ie.text_sha1 = self.file_1.text_sha1
335
other_ie.text_size = self.file_1.text_size
336
self.assertEqual(self.file_1, other_ie)
337
other_ie.revision = 'other'
338
self.assertNotEqual(self.file_1, other_ie)
339
versionfile = self.branch.repository.weave_store.get_weave(
340
'fileid', self.branch.repository.get_transaction())
341
versionfile.clone_text('other', '1', ['1'])
342
self.file_active.snapshot('2', 'subdir/file',
343
{'1':self.file_1, 'other':other_ie},
345
self.branch.repository.weave_store,
346
self.branch.get_transaction())
347
self.assertEqual(self.file_active.revision, '2')
349
def test_snapshot_changed(self):
350
# This tests that a commit with one different parent results in a new
351
# revision id in the entry.
352
self.file_active.name='newname'
353
rename('subdir/file', 'subdir/newname')
354
self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1},
356
self.branch.repository.weave_store,
357
self.branch.get_transaction())
358
# expected outcome - file_1 has a revision id of '2'
359
self.assertEqual(self.file_active.revision, '2')
362
class TestPreviousHeads(TestCaseWithTransport):
365
# we want several inventories, that respectively
366
# give use the following scenarios:
367
# A) fileid not in any inventory (A),
368
# B) fileid present in one inventory (B) and (A,B)
369
# C) fileid present in two inventories, and they
370
# are not mutual descendents (B, C)
371
# D) fileid present in two inventories and one is
372
# a descendent of the other. (B, D)
373
super(TestPreviousHeads, self).setUp()
374
self.wt = self.make_branch_and_tree('.')
375
self.branch = self.wt.branch
376
self.build_tree(['file'])
377
self.wt.commit('new branch', allow_pointless=True, rev_id='A')
378
self.inv_A = self.branch.repository.get_inventory('A')
379
self.wt.add(['file'], ['fileid'])
380
self.wt.commit('add file', rev_id='B')
381
self.inv_B = self.branch.repository.get_inventory('B')
382
uncommit(self.branch, tree=self.wt)
383
self.assertEqual(self.branch.revision_history(), ['A'])
384
self.wt.commit('another add of file', rev_id='C')
385
self.inv_C = self.branch.repository.get_inventory('C')
386
self.wt.add_pending_merge('B')
387
self.wt.commit('merge in B', rev_id='D')
388
self.inv_D = self.branch.repository.get_inventory('D')
389
self.file_active = self.wt.inventory['fileid']
390
self.weave = self.branch.repository.weave_store.get_weave('fileid',
391
self.branch.repository.get_transaction())
393
def get_previous_heads(self, inventories):
394
return self.file_active.find_previous_heads(
396
self.branch.repository.weave_store,
397
self.branch.repository.get_transaction())
399
def test_fileid_in_no_inventory(self):
400
self.assertEqual({}, self.get_previous_heads([self.inv_A]))
402
def test_fileid_in_one_inventory(self):
403
self.assertEqual({'B':self.inv_B['fileid']},
404
self.get_previous_heads([self.inv_B]))
405
self.assertEqual({'B':self.inv_B['fileid']},
406
self.get_previous_heads([self.inv_A, self.inv_B]))
407
self.assertEqual({'B':self.inv_B['fileid']},
408
self.get_previous_heads([self.inv_B, self.inv_A]))
410
def test_fileid_in_two_inventories_gives_both_entries(self):
411
self.assertEqual({'B':self.inv_B['fileid'],
412
'C':self.inv_C['fileid']},
413
self.get_previous_heads([self.inv_B, self.inv_C]))
414
self.assertEqual({'B':self.inv_B['fileid'],
415
'C':self.inv_C['fileid']},
416
self.get_previous_heads([self.inv_C, self.inv_B]))
418
def test_fileid_in_two_inventories_already_merged_gives_head(self):
419
self.assertEqual({'D':self.inv_D['fileid']},
420
self.get_previous_heads([self.inv_B, self.inv_D]))
421
self.assertEqual({'D':self.inv_D['fileid']},
422
self.get_previous_heads([self.inv_D, self.inv_B]))
424
# TODO: test two inventories with the same file revision
587
427
class TestDescribeChanges(TestCase):
642
482
self.assertEqual(expected_change, change)
645
class TestCHKInventory(TestCaseWithTransport):
647
def get_chk_bytes(self):
648
# The easiest way to get a CHK store is a development6 repository and
649
# then work with the chk_bytes attribute directly.
650
repo = self.make_repository(".", format="development6-rich-root")
652
self.addCleanup(repo.unlock)
653
repo.start_write_group()
654
self.addCleanup(repo.abort_write_group)
655
return repo.chk_bytes
657
def read_bytes(self, chk_bytes, key):
658
stream = chk_bytes.get_record_stream([key], 'unordered', True)
659
return stream.next().get_bytes_as("fulltext")
661
def test_deserialise_gives_CHKInventory(self):
663
inv.revision_id = "revid"
664
inv.root.revision = "rootrev"
665
chk_bytes = self.get_chk_bytes()
666
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
667
bytes = ''.join(chk_inv.to_lines())
668
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
669
self.assertEqual("revid", new_inv.revision_id)
670
self.assertEqual("directory", new_inv.root.kind)
671
self.assertEqual(inv.root.file_id, new_inv.root.file_id)
672
self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
673
self.assertEqual(inv.root.name, new_inv.root.name)
674
self.assertEqual("rootrev", new_inv.root.revision)
675
self.assertEqual('plain', new_inv._search_key_name)
677
def test_deserialise_wrong_revid(self):
679
inv.revision_id = "revid"
680
inv.root.revision = "rootrev"
681
chk_bytes = self.get_chk_bytes()
682
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
683
bytes = ''.join(chk_inv.to_lines())
684
self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
687
def test_captures_rev_root_byid(self):
689
inv.revision_id = "foo"
690
inv.root.revision = "bar"
691
chk_bytes = self.get_chk_bytes()
692
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
693
lines = chk_inv.to_lines()
696
'revision_id: foo\n',
697
'root_id: TREE_ROOT\n',
698
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
699
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
701
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
702
self.assertEqual('plain', chk_inv._search_key_name)
704
def test_captures_parent_id_basename_index(self):
706
inv.revision_id = "foo"
707
inv.root.revision = "bar"
708
chk_bytes = self.get_chk_bytes()
709
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
710
lines = chk_inv.to_lines()
713
'revision_id: foo\n',
714
'root_id: TREE_ROOT\n',
715
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
716
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
718
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
719
self.assertEqual('plain', chk_inv._search_key_name)
721
def test_captures_search_key_name(self):
723
inv.revision_id = "foo"
724
inv.root.revision = "bar"
725
chk_bytes = self.get_chk_bytes()
726
chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
727
search_key_name='hash-16-way')
728
lines = chk_inv.to_lines()
731
'search_key_name: hash-16-way\n',
732
'root_id: TREE_ROOT\n',
733
'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
734
'revision_id: foo\n',
735
'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
737
chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
738
self.assertEqual('hash-16-way', chk_inv._search_key_name)
740
def test_directory_children_on_demand(self):
742
inv.revision_id = "revid"
743
inv.root.revision = "rootrev"
744
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
745
inv["fileid"].revision = "filerev"
746
inv["fileid"].executable = True
747
inv["fileid"].text_sha1 = "ffff"
748
inv["fileid"].text_size = 1
749
chk_bytes = self.get_chk_bytes()
750
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
751
bytes = ''.join(chk_inv.to_lines())
752
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
753
root_entry = new_inv[inv.root.file_id]
754
self.assertEqual(None, root_entry._children)
755
self.assertEqual(['file'], root_entry.children.keys())
756
file_direct = new_inv["fileid"]
757
file_found = root_entry.children['file']
758
self.assertEqual(file_direct.kind, file_found.kind)
759
self.assertEqual(file_direct.file_id, file_found.file_id)
760
self.assertEqual(file_direct.parent_id, file_found.parent_id)
761
self.assertEqual(file_direct.name, file_found.name)
762
self.assertEqual(file_direct.revision, file_found.revision)
763
self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
764
self.assertEqual(file_direct.text_size, file_found.text_size)
765
self.assertEqual(file_direct.executable, file_found.executable)
767
def test_from_inventory_maximum_size(self):
768
# from_inventory supports the maximum_size parameter.
770
inv.revision_id = "revid"
771
inv.root.revision = "rootrev"
772
chk_bytes = self.get_chk_bytes()
773
chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
774
chk_inv.id_to_entry._ensure_root()
775
self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
776
self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
777
p_id_basename = chk_inv.parent_id_basename_to_file_id
778
p_id_basename._ensure_root()
779
self.assertEqual(120, p_id_basename._root_node.maximum_size)
780
self.assertEqual(2, p_id_basename._root_node._key_width)
782
def test___iter__(self):
784
inv.revision_id = "revid"
785
inv.root.revision = "rootrev"
786
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
787
inv["fileid"].revision = "filerev"
788
inv["fileid"].executable = True
789
inv["fileid"].text_sha1 = "ffff"
790
inv["fileid"].text_size = 1
791
chk_bytes = self.get_chk_bytes()
792
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
793
bytes = ''.join(chk_inv.to_lines())
794
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
795
fileids = list(new_inv.__iter__())
797
self.assertEqual([inv.root.file_id, "fileid"], fileids)
799
def test__len__(self):
801
inv.revision_id = "revid"
802
inv.root.revision = "rootrev"
803
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
804
inv["fileid"].revision = "filerev"
805
inv["fileid"].executable = True
806
inv["fileid"].text_sha1 = "ffff"
807
inv["fileid"].text_size = 1
808
chk_bytes = self.get_chk_bytes()
809
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
810
self.assertEqual(2, len(chk_inv))
812
def test___getitem__(self):
814
inv.revision_id = "revid"
815
inv.root.revision = "rootrev"
816
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
817
inv["fileid"].revision = "filerev"
818
inv["fileid"].executable = True
819
inv["fileid"].text_sha1 = "ffff"
820
inv["fileid"].text_size = 1
821
chk_bytes = self.get_chk_bytes()
822
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
823
bytes = ''.join(chk_inv.to_lines())
824
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
825
root_entry = new_inv[inv.root.file_id]
826
file_entry = new_inv["fileid"]
827
self.assertEqual("directory", root_entry.kind)
828
self.assertEqual(inv.root.file_id, root_entry.file_id)
829
self.assertEqual(inv.root.parent_id, root_entry.parent_id)
830
self.assertEqual(inv.root.name, root_entry.name)
831
self.assertEqual("rootrev", root_entry.revision)
832
self.assertEqual("file", file_entry.kind)
833
self.assertEqual("fileid", file_entry.file_id)
834
self.assertEqual(inv.root.file_id, file_entry.parent_id)
835
self.assertEqual("file", file_entry.name)
836
self.assertEqual("filerev", file_entry.revision)
837
self.assertEqual("ffff", file_entry.text_sha1)
838
self.assertEqual(1, file_entry.text_size)
839
self.assertEqual(True, file_entry.executable)
840
self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
842
def test_has_id_true(self):
844
inv.revision_id = "revid"
845
inv.root.revision = "rootrev"
846
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
847
inv["fileid"].revision = "filerev"
848
inv["fileid"].executable = True
849
inv["fileid"].text_sha1 = "ffff"
850
inv["fileid"].text_size = 1
851
chk_bytes = self.get_chk_bytes()
852
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
853
self.assertTrue(chk_inv.has_id('fileid'))
854
self.assertTrue(chk_inv.has_id(inv.root.file_id))
856
def test_has_id_not(self):
858
inv.revision_id = "revid"
859
inv.root.revision = "rootrev"
860
chk_bytes = self.get_chk_bytes()
861
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
862
self.assertFalse(chk_inv.has_id('fileid'))
864
def test_id2path(self):
866
inv.revision_id = "revid"
867
inv.root.revision = "rootrev"
868
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
869
fileentry = InventoryFile("fileid", "file", "dirid")
872
inv["fileid"].revision = "filerev"
873
inv["fileid"].executable = True
874
inv["fileid"].text_sha1 = "ffff"
875
inv["fileid"].text_size = 1
876
inv["dirid"].revision = "filerev"
877
chk_bytes = self.get_chk_bytes()
878
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
879
bytes = ''.join(chk_inv.to_lines())
880
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
881
self.assertEqual('', new_inv.id2path(inv.root.file_id))
882
self.assertEqual('dir', new_inv.id2path('dirid'))
883
self.assertEqual('dir/file', new_inv.id2path('fileid'))
885
def test_path2id(self):
887
inv.revision_id = "revid"
888
inv.root.revision = "rootrev"
889
direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
890
fileentry = InventoryFile("fileid", "file", "dirid")
893
inv["fileid"].revision = "filerev"
894
inv["fileid"].executable = True
895
inv["fileid"].text_sha1 = "ffff"
896
inv["fileid"].text_size = 1
897
inv["dirid"].revision = "filerev"
898
chk_bytes = self.get_chk_bytes()
899
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
900
bytes = ''.join(chk_inv.to_lines())
901
new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
902
self.assertEqual(inv.root.file_id, new_inv.path2id(''))
903
self.assertEqual('dirid', new_inv.path2id('dir'))
904
self.assertEqual('fileid', new_inv.path2id('dir/file'))
906
def test_create_by_apply_delta_sets_root(self):
908
inv.revision_id = "revid"
909
chk_bytes = self.get_chk_bytes()
910
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
911
inv.add_path("", "directory", "myrootid", None)
912
inv.revision_id = "expectedid"
913
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
914
delta = [("", None, base_inv.root.file_id, None),
915
(None, "", "myrootid", inv.root)]
916
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
917
self.assertEquals(reference_inv.root, new_inv.root)
919
def test_create_by_apply_delta_empty_add_child(self):
921
inv.revision_id = "revid"
922
inv.root.revision = "rootrev"
923
chk_bytes = self.get_chk_bytes()
924
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
925
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
926
a_entry.revision = "filerev"
927
a_entry.executable = True
928
a_entry.text_sha1 = "ffff"
929
a_entry.text_size = 1
931
inv.revision_id = "expectedid"
932
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
933
delta = [(None, "A", "A-id", a_entry)]
934
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
935
# new_inv should be the same as reference_inv.
936
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
937
self.assertEqual(reference_inv.root_id, new_inv.root_id)
938
reference_inv.id_to_entry._ensure_root()
939
new_inv.id_to_entry._ensure_root()
940
self.assertEqual(reference_inv.id_to_entry._root_node._key,
941
new_inv.id_to_entry._root_node._key)
943
def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
945
inv.revision_id = "revid"
946
inv.root.revision = "rootrev"
947
chk_bytes = self.get_chk_bytes()
948
base_inv = CHKInventory.from_inventory(chk_bytes, inv)
949
a_entry = InventoryFile("A-id", "A", inv.root.file_id)
950
a_entry.revision = "filerev"
951
a_entry.executable = True
952
a_entry.text_sha1 = "ffff"
953
a_entry.text_size = 1
955
inv.revision_id = "expectedid"
956
reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
957
delta = [(None, "A", "A-id", a_entry)]
958
new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
959
reference_inv.id_to_entry._ensure_root()
960
reference_inv.parent_id_basename_to_file_id._ensure_root()
961
new_inv.id_to_entry._ensure_root()
962
new_inv.parent_id_basename_to_file_id._ensure_root()
963
# new_inv should be the same as reference_inv.
964
self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
965
self.assertEqual(reference_inv.root_id, new_inv.root_id)
966
self.assertEqual(reference_inv.id_to_entry._root_node._key,
967
new_inv.id_to_entry._root_node._key)
968
self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
969
new_inv.parent_id_basename_to_file_id._root_node._key)
971
def test_iter_changes(self):
972
# Low level bootstrapping smoke test; comprehensive generic tests via
973
# InterTree are coming.
975
inv.revision_id = "revid"
976
inv.root.revision = "rootrev"
977
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
978
inv["fileid"].revision = "filerev"
979
inv["fileid"].executable = True
980
inv["fileid"].text_sha1 = "ffff"
981
inv["fileid"].text_size = 1
983
inv2.revision_id = "revid2"
984
inv2.root.revision = "rootrev"
985
inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
986
inv2["fileid"].revision = "filerev2"
987
inv2["fileid"].executable = False
988
inv2["fileid"].text_sha1 = "bbbb"
989
inv2["fileid"].text_size = 2
991
chk_bytes = self.get_chk_bytes()
992
chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
993
bytes = ''.join(chk_inv.to_lines())
994
inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
995
chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
996
bytes = ''.join(chk_inv2.to_lines())
997
inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
998
self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
999
('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1001
list(inv_1.iter_changes(inv_2)))
1003
def test_parent_id_basename_to_file_id_index_enabled(self):
1005
inv.revision_id = "revid"
1006
inv.root.revision = "rootrev"
1007
inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1008
inv["fileid"].revision = "filerev"
1009
inv["fileid"].executable = True
1010
inv["fileid"].text_sha1 = "ffff"
1011
inv["fileid"].text_size = 1
1012
# get fresh objects.
1013
chk_bytes = self.get_chk_bytes()
1014
tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1015
bytes = ''.join(tmp_inv.to_lines())
1016
chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1017
self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1019
{('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1020
dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1022
def test_file_entry_to_bytes(self):
1023
inv = CHKInventory(None)
1024
ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1025
ie.executable = True
1026
ie.revision = 'file-rev-id'
1027
ie.text_sha1 = 'abcdefgh'
1029
bytes = inv._entry_to_bytes(ie)
1030
self.assertEqual('file: file-id\nparent-id\nfilename\n'
1031
'file-rev-id\nabcdefgh\n100\nY', bytes)
1032
ie2 = inv._bytes_to_entry(bytes)
1033
self.assertEqual(ie, ie2)
1034
self.assertIsInstance(ie2.name, unicode)
1035
self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1036
inv._bytes_to_utf8name_key(bytes))
1038
def test_file2_entry_to_bytes(self):
1039
inv = CHKInventory(None)
1041
ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1042
ie.executable = False
1043
ie.revision = 'file-rev-id'
1044
ie.text_sha1 = '123456'
1046
bytes = inv._entry_to_bytes(ie)
1047
self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1048
'file-rev-id\n123456\n25\nN', bytes)
1049
ie2 = inv._bytes_to_entry(bytes)
1050
self.assertEqual(ie, ie2)
1051
self.assertIsInstance(ie2.name, unicode)
1052
self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1053
inv._bytes_to_utf8name_key(bytes))
1055
def test_dir_entry_to_bytes(self):
1056
inv = CHKInventory(None)
1057
ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1058
ie.revision = 'dir-rev-id'
1059
bytes = inv._entry_to_bytes(ie)
1060
self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1061
ie2 = inv._bytes_to_entry(bytes)
1062
self.assertEqual(ie, ie2)
1063
self.assertIsInstance(ie2.name, unicode)
1064
self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1065
inv._bytes_to_utf8name_key(bytes))
1067
def test_dir2_entry_to_bytes(self):
1068
inv = CHKInventory(None)
1069
ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1071
ie.revision = 'dir-rev-id'
1072
bytes = inv._entry_to_bytes(ie)
1073
self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1074
'dir-rev-id', bytes)
1075
ie2 = inv._bytes_to_entry(bytes)
1076
self.assertEqual(ie, ie2)
1077
self.assertIsInstance(ie2.name, unicode)
1078
self.assertIs(ie2.parent_id, None)
1079
self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1080
inv._bytes_to_utf8name_key(bytes))
1082
def test_symlink_entry_to_bytes(self):
1083
inv = CHKInventory(None)
1084
ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1085
ie.revision = 'link-rev-id'
1086
ie.symlink_target = u'target/path'
1087
bytes = inv._entry_to_bytes(ie)
1088
self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1089
'link-rev-id\ntarget/path', bytes)
1090
ie2 = inv._bytes_to_entry(bytes)
1091
self.assertEqual(ie, ie2)
1092
self.assertIsInstance(ie2.name, unicode)
1093
self.assertIsInstance(ie2.symlink_target, unicode)
1094
self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1095
inv._bytes_to_utf8name_key(bytes))
1097
def test_symlink2_entry_to_bytes(self):
1098
inv = CHKInventory(None)
1099
ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1100
ie.revision = 'link-rev-id'
1101
ie.symlink_target = u'target/\u03a9path'
1102
bytes = inv._entry_to_bytes(ie)
1103
self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1104
'link-rev-id\ntarget/\xce\xa9path', bytes)
1105
ie2 = inv._bytes_to_entry(bytes)
1106
self.assertEqual(ie, ie2)
1107
self.assertIsInstance(ie2.name, unicode)
1108
self.assertIsInstance(ie2.symlink_target, unicode)
1109
self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1110
inv._bytes_to_utf8name_key(bytes))
1112
def test_tree_reference_entry_to_bytes(self):
1113
inv = CHKInventory(None)
1114
ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1116
ie.revision = 'tree-rev-id'
1117
ie.reference_revision = 'ref-rev-id'
1118
bytes = inv._entry_to_bytes(ie)
1119
self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1120
'tree-rev-id\nref-rev-id', bytes)
1121
ie2 = inv._bytes_to_entry(bytes)
1122
self.assertEqual(ie, ie2)
1123
self.assertIsInstance(ie2.name, unicode)
1124
self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1125
inv._bytes_to_utf8name_key(bytes))
485
class TestExecutable(TestCaseWithTransport):
487
def test_stays_executable(self):
488
a_id = "a-20051208024829-849e76f7968d7a86"
489
b_id = "b-20051208024829-849e76f7968d7a86"
490
wt = self.make_branch_and_tree('b1')
492
tt = TreeTransform(wt)
493
tt.new_file('a', tt.root, 'a test\n', a_id, True)
494
tt.new_file('b', tt.root, 'b test\n', b_id, False)
497
self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
499
# reopen the tree and ensure it stuck.
500
wt = wt.bzrdir.open_workingtree()
501
self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
503
self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
504
self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
506
wt.commit('adding a,b', rev_id='r1')
508
rev_tree = b.repository.revision_tree('r1')
509
self.failUnless(rev_tree.is_executable(a_id), "'a' lost the execute bit")
510
self.failIf(rev_tree.is_executable(b_id), "'b' gained an execute bit")
512
self.failUnless(rev_tree.inventory[a_id].executable)
513
self.failIf(rev_tree.inventory[b_id].executable)
515
# Make sure the entries are gone
518
self.failIf(wt.has_id(a_id))
519
self.failIf(wt.has_filename('a'))
520
self.failIf(wt.has_id(b_id))
521
self.failIf(wt.has_filename('b'))
523
# Make sure that revert is able to bring them back,
524
# and sets 'a' back to being executable
526
wt.revert(['a', 'b'], rev_tree, backups=False)
527
self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
529
self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
530
self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
532
# Now remove them again, and make sure that after a
533
# commit, they are still marked correctly
536
wt.commit('removed', rev_id='r2')
538
self.assertEqual([], [cn for cn,ie in wt.inventory.iter_entries()])
539
self.failIf(wt.has_id(a_id))
540
self.failIf(wt.has_filename('a'))
541
self.failIf(wt.has_id(b_id))
542
self.failIf(wt.has_filename('b'))
544
# Now revert back to the previous commit
545
wt.revert([], rev_tree, backups=False)
546
self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
548
self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
549
self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
551
# Now make sure that 'bzr branch' also preserves the
553
# TODO: Maybe this should be a blackbox test
554
d2 = b.bzrdir.clone('b2', revision_id='r1')
555
t2 = d2.open_workingtree()
557
self.assertEquals('r1', b2.last_revision())
559
self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
560
self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
561
self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
563
# Make sure pull will delete the files
565
self.assertEquals('r2', b2.last_revision())
566
self.assertEqual([], [cn for cn,ie in t2.inventory.iter_entries()])
568
# Now commit the changes on the first branch
569
# so that the second branch can pull the changes
570
# and make sure that the executable bit has been copied
571
wt.commit('resurrected', rev_id='r3')
574
self.assertEquals('r3', b2.last_revision())
575
self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
577
self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
578
self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
581
class TestRevert(TestCaseWithTransport):
583
def test_dangling_id(self):
584
wt = self.make_branch_and_tree('b1')
585
self.assertEqual(len(wt.inventory), 1)
586
open('b1/a', 'wb').write('a test\n')
588
self.assertEqual(len(wt.inventory), 2)
591
self.assertEqual(len(wt.inventory), 1)