~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

[merge] robert's knit-performance work

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from bzrlib import (
19
 
    chk_map,
20
 
    groupcompress,
21
 
    errors,
22
 
    inventory,
23
 
    osutils,
24
 
    repository,
25
 
    revision,
26
 
    tests,
27
 
    workingtree,
28
 
    )
29
 
from bzrlib.inventory import (
30
 
    CHKInventory,
31
 
    Inventory,
32
 
    ROOT_ID,
33
 
    InventoryFile,
34
 
    InventoryDirectory,
35
 
    InventoryEntry,
36
 
    TreeReference,
37
 
    mutable_inventory_from_tree,
38
 
    )
39
 
from bzrlib.tests import (
40
 
    TestCase,
41
 
    TestCaseWithTransport,
42
 
    )
43
 
from bzrlib.tests.scenarios import load_tests_apply_scenarios
44
 
 
45
 
 
46
 
load_tests = load_tests_apply_scenarios
47
 
 
48
 
 
49
 
def delta_application_scenarios():
50
 
    scenarios = [
51
 
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
52
 
        ]
53
 
    # Working tree basis delta application
54
 
    # Repository add_inv_by_delta.
55
 
    # Reduce form of the per_repository test logic - that logic needs to be
56
 
    # be able to get /just/ repositories whereas these tests are fine with
57
 
    # just creating trees.
58
 
    formats = set()
59
 
    for _, format in repository.format_registry.iteritems():
60
 
        if format.supports_full_versioned_files:
61
 
            scenarios.append((str(format.__name__), {
62
 
                'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
63
 
                'format':format}))
64
 
    for format in workingtree.format_registry._get_all():
65
 
        repo_fmt = format._matchingbzrdir.repository_format
66
 
        if not repo_fmt.supports_full_versioned_files:
67
 
            continue
68
 
        scenarios.append(
69
 
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
70
 
            'apply_delta':apply_inventory_WT_basis,
71
 
            'format':format}))
72
 
        scenarios.append(
73
 
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
74
 
            'apply_delta':apply_inventory_WT,
75
 
            'format':format}))
76
 
    return scenarios
77
 
 
78
 
 
79
 
def create_texts_for_inv(repo, inv):
80
 
    for path, ie in inv.iter_entries():
81
 
        if ie.text_size:
82
 
            lines = ['a' * ie.text_size]
83
 
        else:
84
 
            lines = []
85
 
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
86
 
 
87
 
 
88
 
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
89
 
    """Apply delta to basis and return the result.
90
 
 
91
 
    :param basis: An inventory to be used as the basis.
92
 
    :param delta: The inventory delta to apply:
93
 
    :return: An inventory resulting from the application.
94
 
    """
95
 
    basis.apply_delta(delta)
96
 
    return basis
97
 
 
98
 
 
99
 
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
100
 
    """Apply delta to basis and return the result.
101
 
 
102
 
    This sets the tree state to be basis, and then calls apply_inventory_delta.
103
 
 
104
 
    :param basis: An inventory to be used as the basis.
105
 
    :param delta: The inventory delta to apply:
106
 
    :return: An inventory resulting from the application.
107
 
    """
108
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
109
 
    control.create_repository()
110
 
    control.create_branch()
111
 
    tree = self.format.initialize(control)
112
 
    tree.lock_write()
113
 
    try:
114
 
        tree._write_inventory(basis)
115
 
    finally:
116
 
        tree.unlock()
117
 
    # Fresh object, reads disk again.
118
 
    tree = tree.bzrdir.open_workingtree()
119
 
    tree.lock_write()
120
 
    try:
121
 
        tree.apply_inventory_delta(delta)
122
 
    finally:
123
 
        tree.unlock()
124
 
    # reload tree - ensure we get what was written.
125
 
    tree = tree.bzrdir.open_workingtree()
126
 
    tree.lock_read()
127
 
    self.addCleanup(tree.unlock)
128
 
    if not invalid_delta:
129
 
        tree._validate()
130
 
    return tree.inventory
131
 
 
132
 
 
133
 
def _create_repo_revisions(repo, basis, delta, invalid_delta):
134
 
    repo.start_write_group()
135
 
    try:
136
 
        rev = revision.Revision('basis', timestamp=0, timezone=None,
137
 
            message="", committer="foo@example.com")
138
 
        basis.revision_id = 'basis'
139
 
        create_texts_for_inv(repo, basis)
140
 
        repo.add_revision('basis', rev, basis)
141
 
        if invalid_delta:
142
 
            # We don't want to apply the delta to the basis, because we expect
143
 
            # the delta is invalid.
144
 
            result_inv = basis
145
 
            result_inv.revision_id = 'result'
146
 
            target_entries = None
147
 
        else:
148
 
            result_inv = basis.create_by_apply_delta(delta, 'result')
149
 
            create_texts_for_inv(repo, result_inv)
150
 
            target_entries = list(result_inv.iter_entries_by_dir())
151
 
        rev = revision.Revision('result', timestamp=0, timezone=None,
152
 
            message="", committer="foo@example.com")
153
 
        repo.add_revision('result', rev, result_inv)
154
 
        repo.commit_write_group()
155
 
    except:
156
 
        repo.abort_write_group()
157
 
        raise
158
 
    return target_entries
159
 
 
160
 
 
161
 
def _get_basis_entries(tree):
162
 
    basis_tree = tree.basis_tree()
163
 
    basis_tree.lock_read()
164
 
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
165
 
    basis_tree.unlock()
166
 
    return basis_tree_entries
167
 
 
168
 
 
169
 
def _populate_different_tree(tree, basis, delta):
170
 
    """Put all entries into tree, but at a unique location."""
171
 
    added_ids = set()
172
 
    added_paths = set()
173
 
    tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
174
 
    for path, ie in basis.iter_entries_by_dir():
175
 
        if ie.file_id in added_ids:
176
 
            continue
177
 
        # We want a unique path for each of these, we use the file-id
178
 
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
179
 
        added_ids.add(ie.file_id)
180
 
    for old_path, new_path, file_id, ie in delta:
181
 
        if file_id in added_ids:
182
 
            continue
183
 
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
184
 
 
185
 
 
186
 
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
187
 
    """Apply delta to basis and return the result.
188
 
 
189
 
    This sets the parent and then calls update_basis_by_delta.
190
 
    It also puts the basis in the repository under both 'basis' and 'result' to
191
 
    allow safety checks made by the WT to succeed, and finally ensures that all
192
 
    items in the delta with a new path are present in the WT before calling
193
 
    update_basis_by_delta.
194
 
 
195
 
    :param basis: An inventory to be used as the basis.
196
 
    :param delta: The inventory delta to apply:
197
 
    :return: An inventory resulting from the application.
198
 
    """
199
 
    control = test.make_bzrdir('tree', format=test.format._matchingbzrdir)
200
 
    control.create_repository()
201
 
    control.create_branch()
202
 
    tree = test.format.initialize(control)
203
 
    tree.lock_write()
204
 
    try:
205
 
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
206
 
                                                delta, invalid_delta)
207
 
        # Set the basis state as the trees current state
208
 
        tree._write_inventory(basis)
209
 
        # This reads basis from the repo and puts it into the tree's local
210
 
        # cache, if it has one.
211
 
        tree.set_parent_ids(['basis'])
212
 
    finally:
213
 
        tree.unlock()
214
 
    # Fresh lock, reads disk again.
215
 
    tree.lock_write()
216
 
    try:
217
 
        tree.update_basis_by_delta('result', delta)
218
 
        if not invalid_delta:
219
 
            tree._validate()
220
 
    finally:
221
 
        tree.unlock()
222
 
    # reload tree - ensure we get what was written.
223
 
    tree = tree.bzrdir.open_workingtree()
224
 
    basis_tree = tree.basis_tree()
225
 
    basis_tree.lock_read()
226
 
    test.addCleanup(basis_tree.unlock)
227
 
    basis_inv = basis_tree.inventory
228
 
    if target_entries:
229
 
        basis_entries = list(basis_inv.iter_entries_by_dir())
230
 
        test.assertEqual(target_entries, basis_entries)
231
 
    return basis_inv
232
 
 
233
 
 
234
 
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
235
 
                                                      invalid_delta=True):
236
 
    """Apply delta to basis and return the result.
237
 
    
238
 
    This inserts basis as a whole inventory and then uses
239
 
    add_inventory_by_delta to add delta.
240
 
 
241
 
    :param basis: An inventory to be used as the basis.
242
 
    :param delta: The inventory delta to apply:
243
 
    :return: An inventory resulting from the application.
244
 
    """
245
 
    format = self.format()
246
 
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
247
 
    repo = format.initialize(control)
248
 
    repo.lock_write()
249
 
    try:
250
 
        repo.start_write_group()
251
 
        try:
252
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
253
 
                message="", committer="foo@example.com")
254
 
            basis.revision_id = 'basis'
255
 
            create_texts_for_inv(repo, basis)
256
 
            repo.add_revision('basis', rev, basis)
257
 
            repo.commit_write_group()
258
 
        except:
259
 
            repo.abort_write_group()
260
 
            raise
261
 
    finally:
262
 
        repo.unlock()
263
 
    repo.lock_write()
264
 
    try:
265
 
        repo.start_write_group()
266
 
        try:
267
 
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
268
 
                'result', ['basis'])
269
 
        except:
270
 
            repo.abort_write_group()
271
 
            raise
272
 
        else:
273
 
            repo.commit_write_group()
274
 
    finally:
275
 
        repo.unlock()
276
 
    # Fresh lock, reads disk again.
277
 
    repo = repo.bzrdir.open_repository()
278
 
    repo.lock_read()
279
 
    self.addCleanup(repo.unlock)
280
 
    return repo.get_inventory('result')
281
 
 
282
 
 
283
 
class TestInventoryUpdates(TestCase):
284
 
 
285
 
    def test_creation_from_root_id(self):
286
 
        # iff a root id is passed to the constructor, a root directory is made
287
 
        inv = inventory.Inventory(root_id='tree-root')
288
 
        self.assertNotEqual(None, inv.root)
289
 
        self.assertEqual('tree-root', inv.root.file_id)
290
 
 
291
 
    def test_add_path_of_root(self):
292
 
        # if no root id is given at creation time, there is no root directory
293
 
        inv = inventory.Inventory(root_id=None)
294
 
        self.assertIs(None, inv.root)
295
 
        # add a root entry by adding its path
296
 
        ie = inv.add_path("", "directory", "my-root")
297
 
        ie.revision = 'test-rev'
298
 
        self.assertEqual("my-root", ie.file_id)
299
 
        self.assertIs(ie, inv.root)
300
 
 
301
 
    def test_add_path(self):
302
 
        inv = inventory.Inventory(root_id='tree_root')
303
 
        ie = inv.add_path('hello', 'file', 'hello-id')
304
 
        self.assertEqual('hello-id', ie.file_id)
305
 
        self.assertEqual('file', ie.kind)
306
 
 
307
 
    def test_copy(self):
308
 
        """Make sure copy() works and creates a deep copy."""
309
 
        inv = inventory.Inventory(root_id='some-tree-root')
310
 
        ie = inv.add_path('hello', 'file', 'hello-id')
311
 
        inv2 = inv.copy()
312
 
        inv.root.file_id = 'some-new-root'
313
 
        ie.name = 'file2'
314
 
        self.assertEqual('some-tree-root', inv2.root.file_id)
315
 
        self.assertEqual('hello', inv2['hello-id'].name)
316
 
 
317
 
    def test_copy_empty(self):
318
 
        """Make sure an empty inventory can be copied."""
319
 
        inv = inventory.Inventory(root_id=None)
320
 
        inv2 = inv.copy()
321
 
        self.assertIs(None, inv2.root)
322
 
 
323
 
    def test_copy_copies_root_revision(self):
324
 
        """Make sure the revision of the root gets copied."""
325
 
        inv = inventory.Inventory(root_id='someroot')
326
 
        inv.root.revision = 'therev'
327
 
        inv2 = inv.copy()
328
 
        self.assertEquals('someroot', inv2.root.file_id)
329
 
        self.assertEquals('therev', inv2.root.revision)
330
 
 
331
 
    def test_create_tree_reference(self):
332
 
        inv = inventory.Inventory('tree-root-123')
333
 
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
334
 
                              revision='rev', reference_revision='rev2'))
335
 
 
336
 
    def test_error_encoding(self):
337
 
        inv = inventory.Inventory('tree-root')
338
 
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
339
 
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
340
 
            InventoryFile('b-id', u'\u1234', 'tree-root'))
341
 
        self.assertContainsRe(str(e), r'\\u1234')
342
 
 
343
 
    def test_add_recursive(self):
344
 
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
345
 
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
346
 
        parent.children[child.file_id] = child
347
 
        inv = inventory.Inventory('tree-root')
348
 
        inv.add(parent)
349
 
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
350
 
 
351
 
 
352
 
 
353
 
class TestDeltaApplication(TestCaseWithTransport):
354
 
 
355
 
    scenarios = delta_application_scenarios()
356
 
 
357
 
    def get_empty_inventory(self, reference_inv=None):
358
 
        """Get an empty inventory.
359
 
 
360
 
        Note that tests should not depend on the revision of the root for
361
 
        setting up test conditions, as it has to be flexible to accomodate non
362
 
        rich root repositories.
363
 
 
364
 
        :param reference_inv: If not None, get the revision for the root from
365
 
            this inventory. This is useful for dealing with older repositories
366
 
            that routinely discarded the root entry data. If None, the root's
367
 
            revision is set to 'basis'.
368
 
        """
369
 
        inv = inventory.Inventory()
370
 
        if reference_inv is not None:
371
 
            inv.root.revision = reference_inv.root.revision
372
 
        else:
373
 
            inv.root.revision = 'basis'
374
 
        return inv
375
 
 
376
 
    def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
377
 
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
378
 
        ie_file.revision = 'result'
379
 
        ie_file.text_size = 0
380
 
        ie_file.text_sha1 = ''
381
 
        return ie_file
382
 
 
383
 
    def test_empty_delta(self):
384
 
        inv = self.get_empty_inventory()
385
 
        delta = []
386
 
        inv = self.apply_delta(self, inv, delta)
387
 
        inv2 = self.get_empty_inventory(inv)
388
 
        self.assertEqual([], inv2._make_delta(inv))
389
 
 
390
 
    def test_None_file_id(self):
391
 
        inv = self.get_empty_inventory()
392
 
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
393
 
        dir1.revision = 'result'
394
 
        delta = [(None, u'dir1', None, dir1)]
395
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
396
 
            inv, delta)
397
 
 
398
 
    def test_unicode_file_id(self):
399
 
        inv = self.get_empty_inventory()
400
 
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
401
 
        dir1.revision = 'result'
402
 
        delta = [(None, u'dir1', dir1.file_id, dir1)]
403
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
404
 
            inv, delta)
405
 
 
406
 
    def test_repeated_file_id(self):
407
 
        inv = self.get_empty_inventory()
408
 
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
409
 
        file1.revision = 'result'
410
 
        file1.text_size = 0
411
 
        file1.text_sha1 = ""
412
 
        file2 = file1.copy()
413
 
        file2.name = 'path2'
414
 
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
415
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
416
 
            inv, delta)
417
 
 
418
 
    def test_repeated_new_path(self):
419
 
        inv = self.get_empty_inventory()
420
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
421
 
        file1.revision = 'result'
422
 
        file1.text_size = 0
423
 
        file1.text_sha1 = ""
424
 
        file2 = file1.copy()
425
 
        file2.file_id = 'id2'
426
 
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
427
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
428
 
            inv, delta)
429
 
 
430
 
    def test_repeated_old_path(self):
431
 
        inv = self.get_empty_inventory()
432
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
433
 
        file1.revision = 'result'
434
 
        file1.text_size = 0
435
 
        file1.text_sha1 = ""
436
 
        # We can't *create* a source inventory with the same path, but
437
 
        # a badly generated partial delta might claim the same source twice.
438
 
        # This would be buggy in two ways: the path is repeated in the delta,
439
 
        # And the path for one of the file ids doesn't match the source
440
 
        # location. Alternatively, we could have a repeated fileid, but that
441
 
        # is separately checked for.
442
 
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
443
 
        file2.revision = 'result'
444
 
        file2.text_size = 0
445
 
        file2.text_sha1 = ""
446
 
        inv.add(file1)
447
 
        inv.add(file2)
448
 
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
449
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
450
 
            inv, delta)
451
 
 
452
 
    def test_mismatched_id_entry_id(self):
453
 
        inv = self.get_empty_inventory()
454
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
455
 
        file1.revision = 'result'
456
 
        file1.text_size = 0
457
 
        file1.text_sha1 = ""
458
 
        delta = [(None, u'path', 'id', file1)]
459
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
460
 
            inv, delta)
461
 
 
462
 
    def test_mismatched_new_path_entry_None(self):
463
 
        inv = self.get_empty_inventory()
464
 
        delta = [(None, u'path', 'id', None)]
465
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
466
 
            inv, delta)
467
 
 
468
 
    def test_mismatched_new_path_None_entry(self):
469
 
        inv = self.get_empty_inventory()
470
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
471
 
        file1.revision = 'result'
472
 
        file1.text_size = 0
473
 
        file1.text_sha1 = ""
474
 
        delta = [(u"path", None, 'id1', file1)]
475
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
476
 
            inv, delta)
477
 
 
478
 
    def test_parent_is_not_directory(self):
479
 
        inv = self.get_empty_inventory()
480
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
481
 
        file1.revision = 'result'
482
 
        file1.text_size = 0
483
 
        file1.text_sha1 = ""
484
 
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
485
 
        file2.revision = 'result'
486
 
        file2.text_size = 0
487
 
        file2.text_sha1 = ""
488
 
        inv.add(file1)
489
 
        delta = [(None, u'path/path2', 'id2', file2)]
490
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
491
 
            inv, delta)
492
 
 
493
 
    def test_parent_is_missing(self):
494
 
        inv = self.get_empty_inventory()
495
 
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
496
 
        file2.revision = 'result'
497
 
        file2.text_size = 0
498
 
        file2.text_sha1 = ""
499
 
        delta = [(None, u'path/path2', 'id2', file2)]
500
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
501
 
            inv, delta)
502
 
 
503
 
    def test_new_parent_path_has_wrong_id(self):
504
 
        inv = self.get_empty_inventory()
505
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
506
 
        parent1.revision = 'result'
507
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
508
 
        parent2.revision = 'result'
509
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
510
 
        file1.revision = 'result'
511
 
        file1.text_size = 0
512
 
        file1.text_sha1 = ""
513
 
        inv.add(parent1)
514
 
        inv.add(parent2)
515
 
        # This delta claims that file1 is at dir/path, but actually its at
516
 
        # dir2/path if you follow the inventory parent structure.
517
 
        delta = [(None, u'dir/path', 'id', file1)]
518
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
519
 
            inv, delta)
520
 
 
521
 
    def test_old_parent_path_is_wrong(self):
522
 
        inv = self.get_empty_inventory()
523
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
524
 
        parent1.revision = 'result'
525
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
526
 
        parent2.revision = 'result'
527
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
528
 
        file1.revision = 'result'
529
 
        file1.text_size = 0
530
 
        file1.text_sha1 = ""
531
 
        inv.add(parent1)
532
 
        inv.add(parent2)
533
 
        inv.add(file1)
534
 
        # This delta claims that file1 was at dir/path, but actually it was at
535
 
        # dir2/path if you follow the inventory parent structure.
536
 
        delta = [(u'dir/path', None, 'id', None)]
537
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
538
 
            inv, delta)
539
 
 
540
 
    def test_old_parent_path_is_for_other_id(self):
541
 
        inv = self.get_empty_inventory()
542
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
543
 
        parent1.revision = 'result'
544
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
545
 
        parent2.revision = 'result'
546
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
547
 
        file1.revision = 'result'
548
 
        file1.text_size = 0
549
 
        file1.text_sha1 = ""
550
 
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
551
 
        file2.revision = 'result'
552
 
        file2.text_size = 0
553
 
        file2.text_sha1 = ""
554
 
        inv.add(parent1)
555
 
        inv.add(parent2)
556
 
        inv.add(file1)
557
 
        inv.add(file2)
558
 
        # This delta claims that file1 was at dir/path, but actually it was at
559
 
        # dir2/path if you follow the inventory parent structure. At dir/path
560
 
        # is another entry we should not delete.
561
 
        delta = [(u'dir/path', None, 'id', None)]
562
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
563
 
            inv, delta)
564
 
 
565
 
    def test_add_existing_id_new_path(self):
566
 
        inv = self.get_empty_inventory()
567
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
568
 
        parent1.revision = 'result'
569
 
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
570
 
        parent2.revision = 'result'
571
 
        inv.add(parent1)
572
 
        delta = [(None, u'dir2', 'p-1', parent2)]
573
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
574
 
            inv, delta)
575
 
 
576
 
    def test_add_new_id_existing_path(self):
577
 
        inv = self.get_empty_inventory()
578
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
579
 
        parent1.revision = 'result'
580
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
581
 
        parent2.revision = 'result'
582
 
        inv.add(parent1)
583
 
        delta = [(None, u'dir1', 'p-2', parent2)]
584
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
585
 
            inv, delta)
586
 
 
587
 
    def test_remove_dir_leaving_dangling_child(self):
588
 
        inv = self.get_empty_inventory()
589
 
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
590
 
        dir1.revision = 'result'
591
 
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
592
 
        dir2.revision = 'result'
593
 
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
594
 
        dir3.revision = 'result'
595
 
        inv.add(dir1)
596
 
        inv.add(dir2)
597
 
        inv.add(dir3)
598
 
        delta = [(u'dir1', None, 'p-1', None),
599
 
            (u'dir1/child2', None, 'p-3', None)]
600
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
601
 
            inv, delta)
602
 
 
603
 
    def test_add_file(self):
604
 
        inv = self.get_empty_inventory()
605
 
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
606
 
        file1.revision = 'result'
607
 
        file1.text_size = 0
608
 
        file1.text_sha1 = ''
609
 
        delta = [(None, u'path', 'file-id', file1)]
610
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
611
 
        self.assertEqual('file-id', res_inv['file-id'].file_id)
612
 
 
613
 
    def test_remove_file(self):
614
 
        inv = self.get_empty_inventory()
615
 
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
616
 
        file1.revision = 'result'
617
 
        file1.text_size = 0
618
 
        file1.text_sha1 = ''
619
 
        inv.add(file1)
620
 
        delta = [(u'path', None, 'file-id', None)]
621
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
622
 
        self.assertEqual(None, res_inv.path2id('path'))
623
 
        self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
624
 
 
625
 
    def test_rename_file(self):
626
 
        inv = self.get_empty_inventory()
627
 
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
628
 
        inv.add(file1)
629
 
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
630
 
        delta = [(u'path', 'path2', 'file-id', file2)]
631
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
632
 
        self.assertEqual(None, res_inv.path2id('path'))
633
 
        self.assertEqual('file-id', res_inv.path2id('path2'))
634
 
 
635
 
    def test_replaced_at_new_path(self):
636
 
        inv = self.get_empty_inventory()
637
 
        file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
638
 
        inv.add(file1)
639
 
        file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
640
 
        delta = [(u'name', None, 'id1', None),
641
 
                 (None, u'name', 'id2', file2)]
642
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
643
 
        self.assertEqual('id2', res_inv.path2id('name'))
644
 
 
645
 
    def test_rename_dir(self):
646
 
        inv = self.get_empty_inventory()
647
 
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
648
 
        dir1.revision = 'basis'
649
 
        file1 = self.make_file_ie(parent_id='dir-id')
650
 
        inv.add(dir1)
651
 
        inv.add(file1)
652
 
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
653
 
        dir2.revision = 'result'
654
 
        delta = [('dir1', 'dir2', 'dir-id', dir2)]
655
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
656
 
        # The file should be accessible under the new path
657
 
        self.assertEqual('file-id', res_inv.path2id('dir2/name'))
658
 
 
659
 
    def test_renamed_dir_with_renamed_child(self):
660
 
        inv = self.get_empty_inventory()
661
 
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
662
 
        dir1.revision = 'basis'
663
 
        file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
664
 
        file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
665
 
        inv.add(dir1)
666
 
        inv.add(file1)
667
 
        inv.add(file2)
668
 
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
669
 
        dir2.revision = 'result'
670
 
        file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
671
 
        delta = [('dir1', 'dir2', 'dir-id', dir2),
672
 
                 ('dir1/name2', 'name2', 'file-id-2', file2b)]
673
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
674
 
        # The file should be accessible under the new path
675
 
        self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
676
 
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
677
 
        self.assertEqual('file-id-2', res_inv.path2id('name2'))
678
 
 
679
 
    def test_is_root(self):
680
 
        """Ensure our root-checking code is accurate."""
681
 
        inv = inventory.Inventory('TREE_ROOT')
682
 
        self.assertTrue(inv.is_root('TREE_ROOT'))
683
 
        self.assertFalse(inv.is_root('booga'))
684
 
        inv.root.file_id = 'booga'
685
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
686
 
        self.assertTrue(inv.is_root('booga'))
687
 
        # works properly even if no root is set
688
 
        inv.root = None
689
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
690
 
        self.assertFalse(inv.is_root('booga'))
691
 
 
692
 
    def test_entries_for_empty_inventory(self):
693
 
        """Test that entries() will not fail for an empty inventory"""
694
 
        inv = Inventory(root_id=None)
695
 
        self.assertEqual([], inv.entries())
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from cStringIO import StringIO
 
18
import os
 
19
 
 
20
from bzrlib.branch import Branch
 
21
import bzrlib.errors as errors
 
22
from bzrlib.diff import internal_diff
 
23
from bzrlib.inventory import Inventory, ROOT_ID
 
24
import bzrlib.inventory as inventory
 
25
from bzrlib.osutils import has_symlinks, rename, pathjoin
 
26
from bzrlib.tests import TestCase, TestCaseWithTransport
 
27
 
 
28
 
 
29
class TestInventory(TestCase):
 
30
 
 
31
    def test_is_within(self):
 
32
        from bzrlib.osutils import is_inside_any
 
33
 
 
34
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
35
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
36
                         (['src'], SRC_FOO_C),
 
37
                         (['src'], 'src'),
 
38
                         ]:
 
39
            self.assert_(is_inside_any(dirs, fn))
 
40
            
 
41
        for dirs, fn in [(['src'], 'srccontrol'),
 
42
                         (['src'], 'srccontrol/foo')]:
 
43
            self.assertFalse(is_inside_any(dirs, fn))
 
44
            
 
45
    def test_ids(self):
 
46
        """Test detection of files within selected directories."""
 
47
        inv = Inventory()
 
48
        
 
49
        for args in [('src', 'directory', 'src-id'), 
 
50
                     ('doc', 'directory', 'doc-id'), 
 
51
                     ('src/hello.c', 'file'),
 
52
                     ('src/bye.c', 'file', 'bye-id'),
 
53
                     ('Makefile', 'file')]:
 
54
            inv.add_path(*args)
 
55
            
 
56
        self.assertEqual(inv.path2id('src'), 'src-id')
 
57
        self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
 
58
        
 
59
        self.assert_('src-id' in inv)
 
60
 
 
61
 
 
62
    def test_version(self):
 
63
        """Inventory remembers the text's version."""
 
64
        inv = Inventory()
 
65
        ie = inv.add_path('foo.txt', 'file')
 
66
        ## XXX
696
67
 
697
68
 
698
69
class TestInventoryEntry(TestCase):
711
82
 
712
83
    def test_dir_detect_changes(self):
713
84
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
85
        left.text_sha1 = 123
 
86
        left.executable = True
 
87
        left.symlink_target='foo'
714
88
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
89
        right.text_sha1 = 321
 
90
        right.symlink_target='bar'
715
91
        self.assertEqual((False, False), left.detect_changes(right))
716
92
        self.assertEqual((False, False), right.detect_changes(left))
717
93
 
731
107
 
732
108
    def test_symlink_detect_changes(self):
733
109
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
110
        left.text_sha1 = 123
 
111
        left.executable = True
734
112
        left.symlink_target='foo'
735
113
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
114
        right.text_sha1 = 321
736
115
        right.symlink_target='foo'
737
116
        self.assertEqual((False, False), left.detect_changes(right))
738
117
        self.assertEqual((False, False), right.detect_changes(left))
742
121
 
743
122
    def test_file_has_text(self):
744
123
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
745
 
        self.assertTrue(file.has_text())
 
124
        self.failUnless(file.has_text())
746
125
 
747
126
    def test_directory_has_text(self):
748
127
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
749
 
        self.assertFalse(dir.has_text())
 
128
        self.failIf(dir.has_text())
750
129
 
751
130
    def test_link_has_text(self):
752
131
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
753
 
        self.assertFalse(link.has_text())
754
 
 
755
 
    def test_make_entry(self):
756
 
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
757
 
            inventory.InventoryFile)
758
 
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
759
 
            inventory.InventoryLink)
760
 
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
761
 
            inventory.InventoryDirectory)
762
 
 
763
 
    def test_make_entry_non_normalized(self):
764
 
        orig_normalized_filename = osutils.normalized_filename
765
 
 
 
132
        self.failIf(link.has_text())
 
133
 
 
134
 
 
135
class TestEntryDiffing(TestCaseWithTransport):
 
136
 
 
137
    def setUp(self):
 
138
        super(TestEntryDiffing, self).setUp()
 
139
        self.wt = self.make_branch_and_tree('.')
 
140
        self.branch = self.wt.branch
 
141
        print >> open('file', 'wb'), 'foo'
 
142
        self.wt.add(['file'], ['fileid'])
 
143
        if has_symlinks():
 
144
            os.symlink('target1', 'symlink')
 
145
            self.wt.add(['symlink'], ['linkid'])
 
146
        self.wt.commit('message_1', rev_id = '1')
 
147
        print >> open('file', 'wb'), 'bar'
 
148
        if has_symlinks():
 
149
            os.unlink('symlink')
 
150
            os.symlink('target2', 'symlink')
 
151
        self.tree_1 = self.branch.repository.revision_tree('1')
 
152
        self.inv_1 = self.branch.repository.get_inventory('1')
 
153
        self.file_1 = self.inv_1['fileid']
 
154
        self.tree_2 = self.wt
 
155
        self.inv_2 = self.tree_2.read_working_inventory()
 
156
        self.file_2 = self.inv_2['fileid']
 
157
        if has_symlinks():
 
158
            self.link_1 = self.inv_1['linkid']
 
159
            self.link_2 = self.inv_2['linkid']
 
160
 
 
161
    def test_file_diff_deleted(self):
 
162
        output = StringIO()
 
163
        self.file_1.diff(internal_diff, 
 
164
                          "old_label", self.tree_1,
 
165
                          "/dev/null", None, None,
 
166
                          output)
 
167
        self.assertEqual(output.getvalue(), "--- old_label\t\n"
 
168
                                            "+++ /dev/null\t\n"
 
169
                                            "@@ -1,1 +0,0 @@\n"
 
170
                                            "-foo\n"
 
171
                                            "\n")
 
172
 
 
173
    def test_file_diff_added(self):
 
174
        output = StringIO()
 
175
        self.file_1.diff(internal_diff, 
 
176
                          "new_label", self.tree_1,
 
177
                          "/dev/null", None, None,
 
178
                          output, reverse=True)
 
179
        self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
 
180
                                            "+++ new_label\t\n"
 
181
                                            "@@ -0,0 +1,1 @@\n"
 
182
                                            "+foo\n"
 
183
                                            "\n")
 
184
 
 
185
    def test_file_diff_changed(self):
 
186
        output = StringIO()
 
187
        self.file_1.diff(internal_diff, 
 
188
                          "/dev/null", self.tree_1, 
 
189
                          "new_label", self.file_2, self.tree_2,
 
190
                          output)
 
191
        self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
 
192
                                            "+++ new_label\t\n"
 
193
                                            "@@ -1,1 +1,1 @@\n"
 
194
                                            "-foo\n"
 
195
                                            "+bar\n"
 
196
                                            "\n")
 
197
        
 
198
    def test_link_diff_deleted(self):
 
199
        if not has_symlinks():
 
200
            return
 
201
        output = StringIO()
 
202
        self.link_1.diff(internal_diff, 
 
203
                          "old_label", self.tree_1,
 
204
                          "/dev/null", None, None,
 
205
                          output)
 
206
        self.assertEqual(output.getvalue(),
 
207
                         "=== target was 'target1'\n")
 
208
 
 
209
    def test_link_diff_added(self):
 
210
        if not has_symlinks():
 
211
            return
 
212
        output = StringIO()
 
213
        self.link_1.diff(internal_diff, 
 
214
                          "new_label", self.tree_1,
 
215
                          "/dev/null", None, None,
 
216
                          output, reverse=True)
 
217
        self.assertEqual(output.getvalue(),
 
218
                         "=== target is 'target1'\n")
 
219
 
 
220
    def test_link_diff_changed(self):
 
221
        if not has_symlinks():
 
222
            return
 
223
        output = StringIO()
 
224
        self.link_1.diff(internal_diff, 
 
225
                          "/dev/null", self.tree_1, 
 
226
                          "new_label", self.link_2, self.tree_2,
 
227
                          output)
 
228
        self.assertEqual(output.getvalue(),
 
229
                         "=== target changed 'target1' => 'target2'\n")
 
230
 
 
231
 
 
232
class TestSnapshot(TestCaseWithTransport):
 
233
 
 
234
    def setUp(self):
 
235
        # for full testing we'll need a branch
 
236
        # with a subdir to test parent changes.
 
237
        # and a file, link and dir under that.
 
238
        # but right now I only need one attribute
 
239
        # to change, and then test merge patterns
 
240
        # with fake parent entries.
 
241
        super(TestSnapshot, self).setUp()
 
242
        self.wt = self.make_branch_and_tree('.')
 
243
        self.branch = self.wt.branch
 
244
        self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
 
245
        self.wt.add(['subdir', 'subdir/file'],
 
246
                                       ['dirid', 'fileid'])
 
247
        if has_symlinks():
 
248
            pass
 
249
        self.wt.commit('message_1', rev_id = '1')
 
250
        self.tree_1 = self.branch.repository.revision_tree('1')
 
251
        self.inv_1 = self.branch.repository.get_inventory('1')
 
252
        self.file_1 = self.inv_1['fileid']
 
253
        self.file_active = self.wt.inventory['fileid']
 
254
 
 
255
    def test_snapshot_new_revision(self):
 
256
        # This tests that a simple commit with no parents makes a new
 
257
        # revision value in the inventory entry
 
258
        self.file_active.snapshot('2', 'subdir/file', {}, self.wt, 
 
259
                                  self.branch.repository.weave_store,
 
260
                                  self.branch.get_transaction())
 
261
        # expected outcome - file_1 has a revision id of '2', and we can get
 
262
        # its text of 'file contents' out of the weave.
 
263
        self.assertEqual(self.file_1.revision, '1')
 
264
        self.assertEqual(self.file_active.revision, '2')
 
265
        # this should be a separate test probably, but lets check it once..
 
266
        lines = self.branch.repository.weave_store.get_weave(
 
267
            'fileid', 
 
268
            self.branch.get_transaction()).get_lines('2')
 
269
        self.assertEqual(lines, ['contents of subdir/file\n'])
 
270
 
 
271
    def test_snapshot_unchanged(self):
 
272
        #This tests that a simple commit does not make a new entry for
 
273
        # an unchanged inventory entry
 
274
        self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
 
275
                                  self.wt, 
 
276
                                  self.branch.repository.weave_store,
 
277
                                  self.branch.get_transaction())
 
278
        self.assertEqual(self.file_1.revision, '1')
 
279
        self.assertEqual(self.file_active.revision, '1')
 
280
        vf = self.branch.repository.weave_store.get_weave(
 
281
            'fileid', 
 
282
            self.branch.repository.get_transaction())
 
283
        self.assertRaises(errors.RevisionNotPresent,
 
284
                          vf.get_lines,
 
285
                          '2')
 
286
 
 
287
    def test_snapshot_merge_identical_different_revid(self):
 
288
        # This tests that a commit with two identical parents, one of which has
 
289
        # a different revision id, results in a new revision id in the entry.
 
290
        # 1->other, commit a merge of other against 1, results in 2.
 
291
        other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
 
292
        other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
 
293
        other_ie.revision = '1'
 
294
        other_ie.text_sha1 = self.file_1.text_sha1
 
295
        other_ie.text_size = self.file_1.text_size
 
296
        self.assertEqual(self.file_1, other_ie)
 
297
        other_ie.revision = 'other'
 
298
        self.assertNotEqual(self.file_1, other_ie)
 
299
        versionfile = self.branch.repository.weave_store.get_weave(
 
300
            'fileid', self.branch.repository.get_transaction())
 
301
        versionfile.clone_text('other', '1', ['1'])
 
302
        self.file_active.snapshot('2', 'subdir/file', 
 
303
                                  {'1':self.file_1, 'other':other_ie},
 
304
                                  self.wt, 
 
305
                                  self.branch.repository.weave_store,
 
306
                                  self.branch.get_transaction())
 
307
        self.assertEqual(self.file_active.revision, '2')
 
308
 
 
309
    def test_snapshot_changed(self):
 
310
        # This tests that a commit with one different parent results in a new
 
311
        # revision id in the entry.
 
312
        self.file_active.name='newname'
 
313
        rename('subdir/file', 'subdir/newname')
 
314
        self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1}, 
 
315
                                  self.wt,
 
316
                                  self.branch.repository.weave_store,
 
317
                                  self.branch.get_transaction())
 
318
        # expected outcome - file_1 has a revision id of '2'
 
319
        self.assertEqual(self.file_active.revision, '2')
 
320
 
 
321
 
 
322
class TestPreviousHeads(TestCaseWithTransport):
 
323
 
 
324
    def setUp(self):
 
325
        # we want several inventories, that respectively
 
326
        # give use the following scenarios:
 
327
        # A) fileid not in any inventory (A),
 
328
        # B) fileid present in one inventory (B) and (A,B)
 
329
        # C) fileid present in two inventories, and they
 
330
        #   are not mutual descendents (B, C)
 
331
        # D) fileid present in two inventories and one is
 
332
        #   a descendent of the other. (B, D)
 
333
        super(TestPreviousHeads, self).setUp()
 
334
        self.wt = self.make_branch_and_tree('.')
 
335
        self.branch = self.wt.branch
 
336
        self.build_tree(['file'])
 
337
        self.wt.commit('new branch', allow_pointless=True, rev_id='A')
 
338
        self.inv_A = self.branch.repository.get_inventory('A')
 
339
        self.wt.add(['file'], ['fileid'])
 
340
        self.wt.commit('add file', rev_id='B')
 
341
        self.inv_B = self.branch.repository.get_inventory('B')
 
342
        self.branch.lock_write()
766
343
        try:
767
 
            osutils.normalized_filename = osutils._accessible_normalized_filename
768
 
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
769
 
            self.assertEqual(u'\xe5', entry.name)
770
 
            self.assertIsInstance(entry, inventory.InventoryFile)
771
 
 
772
 
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
773
 
            self.assertRaises(errors.InvalidNormalization,
774
 
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
344
            self.branch.control_files.put_utf8('revision-history', 'A\n')
775
345
        finally:
776
 
            osutils.normalized_filename = orig_normalized_filename
777
 
 
778
 
 
779
 
class TestDescribeChanges(TestCase):
780
 
 
781
 
    def test_describe_change(self):
782
 
        # we need to test the following change combinations:
783
 
        # rename
784
 
        # reparent
785
 
        # modify
786
 
        # gone
787
 
        # added
788
 
        # renamed/reparented and modified
789
 
        # change kind (perhaps can't be done yet?)
790
 
        # also, merged in combination with all of these?
791
 
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
792
 
        old_a.text_sha1 = '123132'
793
 
        old_a.text_size = 0
794
 
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
795
 
        new_a.text_sha1 = '123132'
796
 
        new_a.text_size = 0
797
 
 
798
 
        self.assertChangeDescription('unchanged', old_a, new_a)
799
 
 
800
 
        new_a.text_size = 10
801
 
        new_a.text_sha1 = 'abcabc'
802
 
        self.assertChangeDescription('modified', old_a, new_a)
803
 
 
804
 
        self.assertChangeDescription('added', None, new_a)
805
 
        self.assertChangeDescription('removed', old_a, None)
806
 
        # perhaps a bit questionable but seems like the most reasonable thing...
807
 
        self.assertChangeDescription('unchanged', None, None)
808
 
 
809
 
        # in this case it's both renamed and modified; show a rename and
810
 
        # modification:
811
 
        new_a.name = 'newfilename'
812
 
        self.assertChangeDescription('modified and renamed', old_a, new_a)
813
 
 
814
 
        # reparenting is 'renaming'
815
 
        new_a.name = old_a.name
816
 
        new_a.parent_id = 'somedir-id'
817
 
        self.assertChangeDescription('modified and renamed', old_a, new_a)
818
 
 
819
 
        # reset the content values so its not modified
820
 
        new_a.text_size = old_a.text_size
821
 
        new_a.text_sha1 = old_a.text_sha1
822
 
        new_a.name = old_a.name
823
 
 
824
 
        new_a.name = 'newfilename'
825
 
        self.assertChangeDescription('renamed', old_a, new_a)
826
 
 
827
 
        # reparenting is 'renaming'
828
 
        new_a.name = old_a.name
829
 
        new_a.parent_id = 'somedir-id'
830
 
        self.assertChangeDescription('renamed', old_a, new_a)
831
 
 
832
 
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
833
 
        change = InventoryEntry.describe_change(old_ie, new_ie)
834
 
        self.assertEqual(expected_change, change)
835
 
 
836
 
 
837
 
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
838
 
 
839
 
    def get_chk_bytes(self):
840
 
        factory = groupcompress.make_pack_factory(True, True, 1)
841
 
        trans = self.get_transport('')
842
 
        return factory(trans)
843
 
 
844
 
    def read_bytes(self, chk_bytes, key):
845
 
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
846
 
        return stream.next().get_bytes_as("fulltext")
847
 
 
848
 
    def test_deserialise_gives_CHKInventory(self):
849
 
        inv = Inventory()
850
 
        inv.revision_id = "revid"
851
 
        inv.root.revision = "rootrev"
852
 
        chk_bytes = self.get_chk_bytes()
853
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
854
 
        bytes = ''.join(chk_inv.to_lines())
855
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
856
 
        self.assertEqual("revid", new_inv.revision_id)
857
 
        self.assertEqual("directory", new_inv.root.kind)
858
 
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
859
 
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
860
 
        self.assertEqual(inv.root.name, new_inv.root.name)
861
 
        self.assertEqual("rootrev", new_inv.root.revision)
862
 
        self.assertEqual('plain', new_inv._search_key_name)
863
 
 
864
 
    def test_deserialise_wrong_revid(self):
865
 
        inv = Inventory()
866
 
        inv.revision_id = "revid"
867
 
        inv.root.revision = "rootrev"
868
 
        chk_bytes = self.get_chk_bytes()
869
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
870
 
        bytes = ''.join(chk_inv.to_lines())
871
 
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
872
 
            bytes, ("revid2",))
873
 
 
874
 
    def test_captures_rev_root_byid(self):
875
 
        inv = Inventory()
876
 
        inv.revision_id = "foo"
877
 
        inv.root.revision = "bar"
878
 
        chk_bytes = self.get_chk_bytes()
879
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
880
 
        lines = chk_inv.to_lines()
881
 
        self.assertEqual([
882
 
            'chkinventory:\n',
883
 
            'revision_id: foo\n',
884
 
            'root_id: TREE_ROOT\n',
885
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
886
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
887
 
            ], lines)
888
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
889
 
        self.assertEqual('plain', chk_inv._search_key_name)
890
 
 
891
 
    def test_captures_parent_id_basename_index(self):
892
 
        inv = Inventory()
893
 
        inv.revision_id = "foo"
894
 
        inv.root.revision = "bar"
895
 
        chk_bytes = self.get_chk_bytes()
896
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
897
 
        lines = chk_inv.to_lines()
898
 
        self.assertEqual([
899
 
            'chkinventory:\n',
900
 
            'revision_id: foo\n',
901
 
            'root_id: TREE_ROOT\n',
902
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
903
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
904
 
            ], lines)
905
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
906
 
        self.assertEqual('plain', chk_inv._search_key_name)
907
 
 
908
 
    def test_captures_search_key_name(self):
909
 
        inv = Inventory()
910
 
        inv.revision_id = "foo"
911
 
        inv.root.revision = "bar"
912
 
        chk_bytes = self.get_chk_bytes()
913
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
914
 
                                              search_key_name='hash-16-way')
915
 
        lines = chk_inv.to_lines()
916
 
        self.assertEqual([
917
 
            'chkinventory:\n',
918
 
            'search_key_name: hash-16-way\n',
919
 
            'root_id: TREE_ROOT\n',
920
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
921
 
            'revision_id: foo\n',
922
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
923
 
            ], lines)
924
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
925
 
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
926
 
 
927
 
    def test_directory_children_on_demand(self):
928
 
        inv = Inventory()
929
 
        inv.revision_id = "revid"
930
 
        inv.root.revision = "rootrev"
931
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
932
 
        inv["fileid"].revision = "filerev"
933
 
        inv["fileid"].executable = True
934
 
        inv["fileid"].text_sha1 = "ffff"
935
 
        inv["fileid"].text_size = 1
936
 
        chk_bytes = self.get_chk_bytes()
937
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
938
 
        bytes = ''.join(chk_inv.to_lines())
939
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
940
 
        root_entry = new_inv[inv.root.file_id]
941
 
        self.assertEqual(None, root_entry._children)
942
 
        self.assertEqual(['file'], root_entry.children.keys())
943
 
        file_direct = new_inv["fileid"]
944
 
        file_found = root_entry.children['file']
945
 
        self.assertEqual(file_direct.kind, file_found.kind)
946
 
        self.assertEqual(file_direct.file_id, file_found.file_id)
947
 
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
948
 
        self.assertEqual(file_direct.name, file_found.name)
949
 
        self.assertEqual(file_direct.revision, file_found.revision)
950
 
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
951
 
        self.assertEqual(file_direct.text_size, file_found.text_size)
952
 
        self.assertEqual(file_direct.executable, file_found.executable)
953
 
 
954
 
    def test_from_inventory_maximum_size(self):
955
 
        # from_inventory supports the maximum_size parameter.
956
 
        inv = Inventory()
957
 
        inv.revision_id = "revid"
958
 
        inv.root.revision = "rootrev"
959
 
        chk_bytes = self.get_chk_bytes()
960
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
961
 
        chk_inv.id_to_entry._ensure_root()
962
 
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
963
 
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
964
 
        p_id_basename = chk_inv.parent_id_basename_to_file_id
965
 
        p_id_basename._ensure_root()
966
 
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
967
 
        self.assertEqual(2, p_id_basename._root_node._key_width)
968
 
 
969
 
    def test___iter__(self):
970
 
        inv = Inventory()
971
 
        inv.revision_id = "revid"
972
 
        inv.root.revision = "rootrev"
973
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
974
 
        inv["fileid"].revision = "filerev"
975
 
        inv["fileid"].executable = True
976
 
        inv["fileid"].text_sha1 = "ffff"
977
 
        inv["fileid"].text_size = 1
978
 
        chk_bytes = self.get_chk_bytes()
979
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
980
 
        bytes = ''.join(chk_inv.to_lines())
981
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
982
 
        fileids = list(new_inv.__iter__())
983
 
        fileids.sort()
984
 
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
985
 
 
986
 
    def test__len__(self):
987
 
        inv = Inventory()
988
 
        inv.revision_id = "revid"
989
 
        inv.root.revision = "rootrev"
990
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
991
 
        inv["fileid"].revision = "filerev"
992
 
        inv["fileid"].executable = True
993
 
        inv["fileid"].text_sha1 = "ffff"
994
 
        inv["fileid"].text_size = 1
995
 
        chk_bytes = self.get_chk_bytes()
996
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
997
 
        self.assertEqual(2, len(chk_inv))
998
 
 
999
 
    def test___getitem__(self):
1000
 
        inv = Inventory()
1001
 
        inv.revision_id = "revid"
1002
 
        inv.root.revision = "rootrev"
1003
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1004
 
        inv["fileid"].revision = "filerev"
1005
 
        inv["fileid"].executable = True
1006
 
        inv["fileid"].text_sha1 = "ffff"
1007
 
        inv["fileid"].text_size = 1
1008
 
        chk_bytes = self.get_chk_bytes()
1009
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1010
 
        bytes = ''.join(chk_inv.to_lines())
1011
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1012
 
        root_entry = new_inv[inv.root.file_id]
1013
 
        file_entry = new_inv["fileid"]
1014
 
        self.assertEqual("directory", root_entry.kind)
1015
 
        self.assertEqual(inv.root.file_id, root_entry.file_id)
1016
 
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
1017
 
        self.assertEqual(inv.root.name, root_entry.name)
1018
 
        self.assertEqual("rootrev", root_entry.revision)
1019
 
        self.assertEqual("file", file_entry.kind)
1020
 
        self.assertEqual("fileid", file_entry.file_id)
1021
 
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
1022
 
        self.assertEqual("file", file_entry.name)
1023
 
        self.assertEqual("filerev", file_entry.revision)
1024
 
        self.assertEqual("ffff", file_entry.text_sha1)
1025
 
        self.assertEqual(1, file_entry.text_size)
1026
 
        self.assertEqual(True, file_entry.executable)
1027
 
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
1028
 
 
1029
 
    def test_has_id_true(self):
1030
 
        inv = Inventory()
1031
 
        inv.revision_id = "revid"
1032
 
        inv.root.revision = "rootrev"
1033
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1034
 
        inv["fileid"].revision = "filerev"
1035
 
        inv["fileid"].executable = True
1036
 
        inv["fileid"].text_sha1 = "ffff"
1037
 
        inv["fileid"].text_size = 1
1038
 
        chk_bytes = self.get_chk_bytes()
1039
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1040
 
        self.assertTrue(chk_inv.has_id('fileid'))
1041
 
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
1042
 
 
1043
 
    def test_has_id_not(self):
1044
 
        inv = Inventory()
1045
 
        inv.revision_id = "revid"
1046
 
        inv.root.revision = "rootrev"
1047
 
        chk_bytes = self.get_chk_bytes()
1048
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1049
 
        self.assertFalse(chk_inv.has_id('fileid'))
1050
 
 
1051
 
    def test_id2path(self):
1052
 
        inv = Inventory()
1053
 
        inv.revision_id = "revid"
1054
 
        inv.root.revision = "rootrev"
1055
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
1056
 
        fileentry = InventoryFile("fileid", "file", "dirid")
1057
 
        inv.add(direntry)
1058
 
        inv.add(fileentry)
1059
 
        inv["fileid"].revision = "filerev"
1060
 
        inv["fileid"].executable = True
1061
 
        inv["fileid"].text_sha1 = "ffff"
1062
 
        inv["fileid"].text_size = 1
1063
 
        inv["dirid"].revision = "filerev"
1064
 
        chk_bytes = self.get_chk_bytes()
1065
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1066
 
        bytes = ''.join(chk_inv.to_lines())
1067
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1068
 
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
1069
 
        self.assertEqual('dir', new_inv.id2path('dirid'))
1070
 
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
1071
 
 
1072
 
    def test_path2id(self):
1073
 
        inv = Inventory()
1074
 
        inv.revision_id = "revid"
1075
 
        inv.root.revision = "rootrev"
1076
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
1077
 
        fileentry = InventoryFile("fileid", "file", "dirid")
1078
 
        inv.add(direntry)
1079
 
        inv.add(fileentry)
1080
 
        inv["fileid"].revision = "filerev"
1081
 
        inv["fileid"].executable = True
1082
 
        inv["fileid"].text_sha1 = "ffff"
1083
 
        inv["fileid"].text_size = 1
1084
 
        inv["dirid"].revision = "filerev"
1085
 
        chk_bytes = self.get_chk_bytes()
1086
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1087
 
        bytes = ''.join(chk_inv.to_lines())
1088
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1089
 
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
1090
 
        self.assertEqual('dirid', new_inv.path2id('dir'))
1091
 
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
1092
 
 
1093
 
    def test_create_by_apply_delta_sets_root(self):
1094
 
        inv = Inventory()
1095
 
        inv.revision_id = "revid"
1096
 
        chk_bytes = self.get_chk_bytes()
1097
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1098
 
        inv.add_path("", "directory", "myrootid", None)
1099
 
        inv.revision_id = "expectedid"
1100
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1101
 
        delta = [("", None, base_inv.root.file_id, None),
1102
 
            (None, "",  "myrootid", inv.root)]
1103
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1104
 
        self.assertEquals(reference_inv.root, new_inv.root)
1105
 
 
1106
 
    def test_create_by_apply_delta_empty_add_child(self):
1107
 
        inv = Inventory()
1108
 
        inv.revision_id = "revid"
1109
 
        inv.root.revision = "rootrev"
1110
 
        chk_bytes = self.get_chk_bytes()
1111
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1112
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1113
 
        a_entry.revision = "filerev"
1114
 
        a_entry.executable = True
1115
 
        a_entry.text_sha1 = "ffff"
1116
 
        a_entry.text_size = 1
1117
 
        inv.add(a_entry)
1118
 
        inv.revision_id = "expectedid"
1119
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1120
 
        delta = [(None, "A",  "A-id", a_entry)]
1121
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1122
 
        # new_inv should be the same as reference_inv.
1123
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1124
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1125
 
        reference_inv.id_to_entry._ensure_root()
1126
 
        new_inv.id_to_entry._ensure_root()
1127
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1128
 
            new_inv.id_to_entry._root_node._key)
1129
 
 
1130
 
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1131
 
        inv = Inventory()
1132
 
        inv.revision_id = "revid"
1133
 
        inv.root.revision = "rootrev"
1134
 
        chk_bytes = self.get_chk_bytes()
1135
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1136
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1137
 
        a_entry.revision = "filerev"
1138
 
        a_entry.executable = True
1139
 
        a_entry.text_sha1 = "ffff"
1140
 
        a_entry.text_size = 1
1141
 
        inv.add(a_entry)
1142
 
        inv.revision_id = "expectedid"
1143
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1144
 
        delta = [(None, "A",  "A-id", a_entry)]
1145
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1146
 
        reference_inv.id_to_entry._ensure_root()
1147
 
        reference_inv.parent_id_basename_to_file_id._ensure_root()
1148
 
        new_inv.id_to_entry._ensure_root()
1149
 
        new_inv.parent_id_basename_to_file_id._ensure_root()
1150
 
        # new_inv should be the same as reference_inv.
1151
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1152
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1153
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1154
 
            new_inv.id_to_entry._root_node._key)
1155
 
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1156
 
            new_inv.parent_id_basename_to_file_id._root_node._key)
1157
 
 
1158
 
    def test_iter_changes(self):
1159
 
        # Low level bootstrapping smoke test; comprehensive generic tests via
1160
 
        # InterTree are coming.
1161
 
        inv = Inventory()
1162
 
        inv.revision_id = "revid"
1163
 
        inv.root.revision = "rootrev"
1164
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1165
 
        inv["fileid"].revision = "filerev"
1166
 
        inv["fileid"].executable = True
1167
 
        inv["fileid"].text_sha1 = "ffff"
1168
 
        inv["fileid"].text_size = 1
1169
 
        inv2 = Inventory()
1170
 
        inv2.revision_id = "revid2"
1171
 
        inv2.root.revision = "rootrev"
1172
 
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1173
 
        inv2["fileid"].revision = "filerev2"
1174
 
        inv2["fileid"].executable = False
1175
 
        inv2["fileid"].text_sha1 = "bbbb"
1176
 
        inv2["fileid"].text_size = 2
1177
 
        # get fresh objects.
1178
 
        chk_bytes = self.get_chk_bytes()
1179
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1180
 
        bytes = ''.join(chk_inv.to_lines())
1181
 
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1182
 
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1183
 
        bytes = ''.join(chk_inv2.to_lines())
1184
 
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1185
 
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1186
 
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1187
 
            (False, True))],
1188
 
            list(inv_1.iter_changes(inv_2)))
1189
 
 
1190
 
    def test_parent_id_basename_to_file_id_index_enabled(self):
1191
 
        inv = Inventory()
1192
 
        inv.revision_id = "revid"
1193
 
        inv.root.revision = "rootrev"
1194
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1195
 
        inv["fileid"].revision = "filerev"
1196
 
        inv["fileid"].executable = True
1197
 
        inv["fileid"].text_sha1 = "ffff"
1198
 
        inv["fileid"].text_size = 1
1199
 
        # get fresh objects.
1200
 
        chk_bytes = self.get_chk_bytes()
1201
 
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1202
 
        bytes = ''.join(tmp_inv.to_lines())
1203
 
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1204
 
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1205
 
        self.assertEqual(
1206
 
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1207
 
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1208
 
 
1209
 
    def test_file_entry_to_bytes(self):
1210
 
        inv = CHKInventory(None)
1211
 
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1212
 
        ie.executable = True
1213
 
        ie.revision = 'file-rev-id'
1214
 
        ie.text_sha1 = 'abcdefgh'
1215
 
        ie.text_size = 100
1216
 
        bytes = inv._entry_to_bytes(ie)
1217
 
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
1218
 
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
1219
 
        ie2 = inv._bytes_to_entry(bytes)
1220
 
        self.assertEqual(ie, ie2)
1221
 
        self.assertIsInstance(ie2.name, unicode)
1222
 
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1223
 
                         inv._bytes_to_utf8name_key(bytes))
1224
 
 
1225
 
    def test_file2_entry_to_bytes(self):
1226
 
        inv = CHKInventory(None)
1227
 
        # \u30a9 == 'omega'
1228
 
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1229
 
        ie.executable = False
1230
 
        ie.revision = 'file-rev-id'
1231
 
        ie.text_sha1 = '123456'
1232
 
        ie.text_size = 25
1233
 
        bytes = inv._entry_to_bytes(ie)
1234
 
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1235
 
                         'file-rev-id\n123456\n25\nN', bytes)
1236
 
        ie2 = inv._bytes_to_entry(bytes)
1237
 
        self.assertEqual(ie, ie2)
1238
 
        self.assertIsInstance(ie2.name, unicode)
1239
 
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1240
 
                         inv._bytes_to_utf8name_key(bytes))
1241
 
 
1242
 
    def test_dir_entry_to_bytes(self):
1243
 
        inv = CHKInventory(None)
1244
 
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1245
 
        ie.revision = 'dir-rev-id'
1246
 
        bytes = inv._entry_to_bytes(ie)
1247
 
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1248
 
        ie2 = inv._bytes_to_entry(bytes)
1249
 
        self.assertEqual(ie, ie2)
1250
 
        self.assertIsInstance(ie2.name, unicode)
1251
 
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1252
 
                         inv._bytes_to_utf8name_key(bytes))
1253
 
 
1254
 
    def test_dir2_entry_to_bytes(self):
1255
 
        inv = CHKInventory(None)
1256
 
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1257
 
                                          None)
1258
 
        ie.revision = 'dir-rev-id'
1259
 
        bytes = inv._entry_to_bytes(ie)
1260
 
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1261
 
                         'dir-rev-id', bytes)
1262
 
        ie2 = inv._bytes_to_entry(bytes)
1263
 
        self.assertEqual(ie, ie2)
1264
 
        self.assertIsInstance(ie2.name, unicode)
1265
 
        self.assertIs(ie2.parent_id, None)
1266
 
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1267
 
                         inv._bytes_to_utf8name_key(bytes))
1268
 
 
1269
 
    def test_symlink_entry_to_bytes(self):
1270
 
        inv = CHKInventory(None)
1271
 
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1272
 
        ie.revision = 'link-rev-id'
1273
 
        ie.symlink_target = u'target/path'
1274
 
        bytes = inv._entry_to_bytes(ie)
1275
 
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1276
 
                         'link-rev-id\ntarget/path', bytes)
1277
 
        ie2 = inv._bytes_to_entry(bytes)
1278
 
        self.assertEqual(ie, ie2)
1279
 
        self.assertIsInstance(ie2.name, unicode)
1280
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1281
 
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1282
 
                         inv._bytes_to_utf8name_key(bytes))
1283
 
 
1284
 
    def test_symlink2_entry_to_bytes(self):
1285
 
        inv = CHKInventory(None)
1286
 
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1287
 
        ie.revision = 'link-rev-id'
1288
 
        ie.symlink_target = u'target/\u03a9path'
1289
 
        bytes = inv._entry_to_bytes(ie)
1290
 
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1291
 
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
1292
 
        ie2 = inv._bytes_to_entry(bytes)
1293
 
        self.assertEqual(ie, ie2)
1294
 
        self.assertIsInstance(ie2.name, unicode)
1295
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1296
 
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1297
 
                         inv._bytes_to_utf8name_key(bytes))
1298
 
 
1299
 
    def test_tree_reference_entry_to_bytes(self):
1300
 
        inv = CHKInventory(None)
1301
 
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1302
 
                                     'parent-id')
1303
 
        ie.revision = 'tree-rev-id'
1304
 
        ie.reference_revision = 'ref-rev-id'
1305
 
        bytes = inv._entry_to_bytes(ie)
1306
 
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1307
 
                         'tree-rev-id\nref-rev-id', bytes)
1308
 
        ie2 = inv._bytes_to_entry(bytes)
1309
 
        self.assertEqual(ie, ie2)
1310
 
        self.assertIsInstance(ie2.name, unicode)
1311
 
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1312
 
                         inv._bytes_to_utf8name_key(bytes))
1313
 
 
1314
 
    def make_basic_utf8_inventory(self):
1315
 
        inv = Inventory()
1316
 
        inv.revision_id = "revid"
1317
 
        inv.root.revision = "rootrev"
1318
 
        root_id = inv.root.file_id
1319
 
        inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1320
 
        inv["fileid"].revision = "filerev"
1321
 
        inv["fileid"].text_sha1 = "ffff"
1322
 
        inv["fileid"].text_size = 0
1323
 
        inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1324
 
        inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1325
 
        inv["childid"].revision = "filerev"
1326
 
        inv["childid"].text_sha1 = "ffff"
1327
 
        inv["childid"].text_size = 0
1328
 
        chk_bytes = self.get_chk_bytes()
1329
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1330
 
        bytes = ''.join(chk_inv.to_lines())
1331
 
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1332
 
 
1333
 
    def test__preload_handles_utf8(self):
1334
 
        new_inv = self.make_basic_utf8_inventory()
1335
 
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
1336
 
        self.assertFalse(new_inv._fully_cached)
1337
 
        new_inv._preload_cache()
1338
 
        self.assertEqual(
1339
 
            sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1340
 
            sorted(new_inv._fileid_to_entry_cache.keys()))
1341
 
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1342
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1343
 
                         sorted(ie_root._children.keys()))
1344
 
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
1345
 
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1346
 
 
1347
 
    def test__preload_populates_cache(self):
1348
 
        inv = Inventory()
1349
 
        inv.revision_id = "revid"
1350
 
        inv.root.revision = "rootrev"
1351
 
        root_id = inv.root.file_id
1352
 
        inv.add(InventoryFile("fileid", "file", root_id))
1353
 
        inv["fileid"].revision = "filerev"
1354
 
        inv["fileid"].executable = True
1355
 
        inv["fileid"].text_sha1 = "ffff"
1356
 
        inv["fileid"].text_size = 1
1357
 
        inv.add(InventoryDirectory("dirid", "dir", root_id))
1358
 
        inv.add(InventoryFile("childid", "child", "dirid"))
1359
 
        inv["childid"].revision = "filerev"
1360
 
        inv["childid"].executable = False
1361
 
        inv["childid"].text_sha1 = "dddd"
1362
 
        inv["childid"].text_size = 1
1363
 
        chk_bytes = self.get_chk_bytes()
1364
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1365
 
        bytes = ''.join(chk_inv.to_lines())
1366
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1367
 
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
1368
 
        self.assertFalse(new_inv._fully_cached)
1369
 
        new_inv._preload_cache()
1370
 
        self.assertEqual(
1371
 
            sorted([root_id, "fileid", "dirid", "childid"]),
1372
 
            sorted(new_inv._fileid_to_entry_cache.keys()))
1373
 
        self.assertTrue(new_inv._fully_cached)
1374
 
        ie_root = new_inv._fileid_to_entry_cache[root_id]
1375
 
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1376
 
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
1377
 
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1378
 
 
1379
 
    def test__preload_handles_partially_evaluated_inventory(self):
1380
 
        new_inv = self.make_basic_utf8_inventory()
1381
 
        ie = new_inv[new_inv.root_id]
1382
 
        self.assertIs(None, ie._children)
1383
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1384
 
                         sorted(ie.children.keys()))
1385
 
        # Accessing .children loads _children
1386
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1387
 
                         sorted(ie._children.keys()))
1388
 
        new_inv._preload_cache()
1389
 
        # No change
1390
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1391
 
                         sorted(ie._children.keys()))
1392
 
        ie_dir = new_inv["dirid"]
1393
 
        self.assertEqual([u'ch\xefld'],
1394
 
                         sorted(ie_dir._children.keys()))
1395
 
 
1396
 
    def test_filter_change_in_renamed_subfolder(self):
1397
 
        inv = Inventory('tree-root')
1398
 
        src_ie = inv.add_path('src', 'directory', 'src-id')
1399
 
        inv.add_path('src/sub/', 'directory', 'sub-id')
1400
 
        a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
1401
 
        a_ie.text_sha1 = osutils.sha_string('content\n')
1402
 
        a_ie.text_size = len('content\n')
1403
 
        chk_bytes = self.get_chk_bytes()
1404
 
        inv = CHKInventory.from_inventory(chk_bytes, inv)
1405
 
        inv = inv.create_by_apply_delta([
1406
 
            ("src/sub/a", "src/sub/a", "a-id", a_ie),
1407
 
            ("src", "src2", "src-id", src_ie),
1408
 
            ], 'new-rev-2')
1409
 
        new_inv = inv.filter(['a-id', 'src-id'])
1410
 
        self.assertEqual([
1411
 
            ('', 'tree-root'),
1412
 
            ('src', 'src-id'),
1413
 
            ('src/sub', 'sub-id'),
1414
 
            ('src/sub/a', 'a-id'),
1415
 
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1416
 
 
1417
 
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1418
 
 
1419
 
    def get_chk_bytes(self):
1420
 
        factory = groupcompress.make_pack_factory(True, True, 1)
1421
 
        trans = self.get_transport('')
1422
 
        return factory(trans)
1423
 
 
1424
 
    def make_dir(self, inv, name, parent_id):
1425
 
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1426
 
 
1427
 
    def make_file(self, inv, name, parent_id, content='content\n'):
1428
 
        ie = inv.make_entry('file', name, parent_id, name + '-id')
1429
 
        ie.text_sha1 = osutils.sha_string(content)
1430
 
        ie.text_size = len(content)
1431
 
        inv.add(ie)
1432
 
 
1433
 
    def make_simple_inventory(self):
1434
 
        inv = Inventory('TREE_ROOT')
1435
 
        inv.revision_id = "revid"
1436
 
        inv.root.revision = "rootrev"
1437
 
        # /                 TREE_ROOT
1438
 
        # dir1/             dir1-id
1439
 
        #   sub-file1       sub-file1-id
1440
 
        #   sub-file2       sub-file2-id
1441
 
        #   sub-dir1/       sub-dir1-id
1442
 
        #     subsub-file1  subsub-file1-id
1443
 
        # dir2/             dir2-id
1444
 
        #   sub2-file1      sub2-file1-id
1445
 
        # top               top-id
1446
 
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
1447
 
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
1448
 
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
1449
 
        self.make_file(inv, 'top', 'TREE_ROOT')
1450
 
        self.make_file(inv, 'sub-file1', 'dir1-id')
1451
 
        self.make_file(inv, 'sub-file2', 'dir1-id')
1452
 
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1453
 
        self.make_file(inv, 'sub2-file1', 'dir2-id')
1454
 
        chk_bytes = self.get_chk_bytes()
1455
 
        #  use a small maximum_size to force internal paging structures
1456
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1457
 
                        maximum_size=100,
1458
 
                        search_key_name='hash-255-way')
1459
 
        bytes = ''.join(chk_inv.to_lines())
1460
 
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1461
 
 
1462
 
    def assert_Getitems(self, expected_fileids, inv, file_ids):
1463
 
        self.assertEqual(sorted(expected_fileids),
1464
 
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1465
 
 
1466
 
    def assertExpand(self, all_ids, inv, file_ids):
1467
 
        (val_all_ids,
1468
 
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1469
 
        self.assertEqual(set(all_ids), val_all_ids)
1470
 
        entries = inv._getitems(val_all_ids)
1471
 
        expected_children = {}
1472
 
        for entry in entries:
1473
 
            s = expected_children.setdefault(entry.parent_id, [])
1474
 
            s.append(entry.file_id)
1475
 
        val_children = dict((k, sorted(v)) for k, v
1476
 
                            in val_children.iteritems())
1477
 
        expected_children = dict((k, sorted(v)) for k, v
1478
 
                            in expected_children.iteritems())
1479
 
        self.assertEqual(expected_children, val_children)
1480
 
 
1481
 
    def test_make_simple_inventory(self):
1482
 
        inv = self.make_simple_inventory()
1483
 
        layout = []
1484
 
        for path, entry in inv.iter_entries_by_dir():
1485
 
            layout.append((path, entry.file_id))
1486
 
        self.assertEqual([
1487
 
            ('', 'TREE_ROOT'),
1488
 
            ('dir1', 'dir1-id'),
1489
 
            ('dir2', 'dir2-id'),
1490
 
            ('top', 'top-id'),
1491
 
            ('dir1/sub-dir1', 'sub-dir1-id'),
1492
 
            ('dir1/sub-file1', 'sub-file1-id'),
1493
 
            ('dir1/sub-file2', 'sub-file2-id'),
1494
 
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1495
 
            ('dir2/sub2-file1', 'sub2-file1-id'),
1496
 
            ], layout)
1497
 
 
1498
 
    def test__getitems(self):
1499
 
        inv = self.make_simple_inventory()
1500
 
        # Reading from disk
1501
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1502
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1503
 
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1504
 
        # From cache
1505
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1506
 
        # Mixed
1507
 
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1508
 
                             ['dir1-id', 'sub-file2-id'])
1509
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1510
 
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1511
 
 
1512
 
    def test_single_file(self):
1513
 
        inv = self.make_simple_inventory()
1514
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1515
 
 
1516
 
    def test_get_all_parents(self):
1517
 
        inv = self.make_simple_inventory()
1518
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1519
 
                           'subsub-file1-id',
1520
 
                          ], inv, ['subsub-file1-id'])
1521
 
 
1522
 
    def test_get_children(self):
1523
 
        inv = self.make_simple_inventory()
1524
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1525
 
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1526
 
                          ], inv, ['dir1-id'])
1527
 
 
1528
 
    def test_from_root(self):
1529
 
        inv = self.make_simple_inventory()
1530
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1531
 
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1532
 
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1533
 
 
1534
 
    def test_top_level_file(self):
1535
 
        inv = self.make_simple_inventory()
1536
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1537
 
 
1538
 
    def test_subsub_file(self):
1539
 
        inv = self.make_simple_inventory()
1540
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1541
 
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
1542
 
 
1543
 
    def test_sub_and_root(self):
1544
 
        inv = self.make_simple_inventory()
1545
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1546
 
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
1547
 
 
1548
 
 
1549
 
class TestMutableInventoryFromTree(TestCaseWithTransport):
1550
 
 
1551
 
    def test_empty(self):
1552
 
        repository = self.make_repository('.')
1553
 
        tree = repository.revision_tree(revision.NULL_REVISION)
1554
 
        inv = mutable_inventory_from_tree(tree)
1555
 
        self.assertEquals(revision.NULL_REVISION, inv.revision_id)
1556
 
        self.assertEquals(0, len(inv))
1557
 
 
1558
 
    def test_some_files(self):
1559
 
        wt = self.make_branch_and_tree('.')
1560
 
        self.build_tree(['a'])
1561
 
        wt.add(['a'], ['thefileid'])
1562
 
        revid = wt.commit("commit")
1563
 
        tree = wt.branch.repository.revision_tree(revid)
1564
 
        inv = mutable_inventory_from_tree(tree)
1565
 
        self.assertEquals(revid, inv.revision_id)
1566
 
        self.assertEquals(2, len(inv))
1567
 
        self.assertEquals("a", inv['thefileid'].name)
1568
 
        # The inventory should be mutable and independent of
1569
 
        # the original tree
1570
 
        self.assertFalse(tree.inventory['thefileid'].executable)
1571
 
        inv['thefileid'].executable = True
1572
 
        self.assertFalse(tree.inventory['thefileid'].executable)
 
346
            self.branch.unlock()
 
347
        self.assertEqual(self.branch.revision_history(), ['A'])
 
348
        self.wt.commit('another add of file', rev_id='C')
 
349
        self.inv_C = self.branch.repository.get_inventory('C')
 
350
        self.wt.add_pending_merge('B')
 
351
        self.wt.commit('merge in B', rev_id='D')
 
352
        self.inv_D = self.branch.repository.get_inventory('D')
 
353
        self.file_active = self.wt.inventory['fileid']
 
354
        self.weave = self.branch.repository.weave_store.get_weave('fileid',
 
355
            self.branch.repository.get_transaction())
 
356
        
 
357
    def get_previous_heads(self, inventories):
 
358
        return self.file_active.find_previous_heads(
 
359
            inventories, 
 
360
            self.branch.repository.weave_store,
 
361
            self.branch.repository.get_transaction())
 
362
        
 
363
    def test_fileid_in_no_inventory(self):
 
364
        self.assertEqual({}, self.get_previous_heads([self.inv_A]))
 
365
 
 
366
    def test_fileid_in_one_inventory(self):
 
367
        self.assertEqual({'B':self.inv_B['fileid']},
 
368
                         self.get_previous_heads([self.inv_B]))
 
369
        self.assertEqual({'B':self.inv_B['fileid']},
 
370
                         self.get_previous_heads([self.inv_A, self.inv_B]))
 
371
        self.assertEqual({'B':self.inv_B['fileid']},
 
372
                         self.get_previous_heads([self.inv_B, self.inv_A]))
 
373
 
 
374
    def test_fileid_in_two_inventories_gives_both_entries(self):
 
375
        self.assertEqual({'B':self.inv_B['fileid'],
 
376
                          'C':self.inv_C['fileid']},
 
377
                          self.get_previous_heads([self.inv_B, self.inv_C]))
 
378
        self.assertEqual({'B':self.inv_B['fileid'],
 
379
                          'C':self.inv_C['fileid']},
 
380
                          self.get_previous_heads([self.inv_C, self.inv_B]))
 
381
 
 
382
    def test_fileid_in_two_inventories_already_merged_gives_head(self):
 
383
        self.assertEqual({'D':self.inv_D['fileid']},
 
384
                         self.get_previous_heads([self.inv_B, self.inv_D]))
 
385
        self.assertEqual({'D':self.inv_D['fileid']},
 
386
                         self.get_previous_heads([self.inv_D, self.inv_B]))
 
387
 
 
388
    # TODO: test two inventories with the same file revision 
 
389
 
 
390
 
 
391
class TestExecutable(TestCaseWithTransport):
 
392
 
 
393
    def test_stays_executable(self):
 
394
        basic_inv = """<inventory format="5">
 
395
<file file_id="a-20051208024829-849e76f7968d7a86" name="a" executable="yes" />
 
396
<file file_id="b-20051208024829-849e76f7968d7a86" name="b" />
 
397
</inventory>
 
398
"""
 
399
        wt = self.make_branch_and_tree('b1')
 
400
        b = wt.branch
 
401
        open('b1/a', 'wb').write('a test\n')
 
402
        open('b1/b', 'wb').write('b test\n')
 
403
        os.chmod('b1/a', 0755)
 
404
        os.chmod('b1/b', 0644)
 
405
        # Manually writing the inventory, to ensure that
 
406
        # the executable="yes" entry is set for 'a' and not for 'b'
 
407
        open('b1/.bzr/inventory', 'wb').write(basic_inv)
 
408
 
 
409
        a_id = "a-20051208024829-849e76f7968d7a86"
 
410
        b_id = "b-20051208024829-849e76f7968d7a86"
 
411
        wt = wt.bzrdir.open_workingtree()
 
412
        self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
 
413
 
 
414
        self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
 
415
        self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
 
416
 
 
417
        wt.commit('adding a,b', rev_id='r1')
 
418
 
 
419
        rev_tree = b.repository.revision_tree('r1')
 
420
        self.failUnless(rev_tree.is_executable(a_id), "'a' lost the execute bit")
 
421
        self.failIf(rev_tree.is_executable(b_id), "'b' gained an execute bit")
 
422
 
 
423
        self.failUnless(rev_tree.inventory[a_id].executable)
 
424
        self.failIf(rev_tree.inventory[b_id].executable)
 
425
 
 
426
        # Make sure the entries are gone
 
427
        os.remove('b1/a')
 
428
        os.remove('b1/b')
 
429
        self.failIf(wt.has_id(a_id))
 
430
        self.failIf(wt.has_filename('a'))
 
431
        self.failIf(wt.has_id(b_id))
 
432
        self.failIf(wt.has_filename('b'))
 
433
 
 
434
        # Make sure that revert is able to bring them back,
 
435
        # and sets 'a' back to being executable
 
436
 
 
437
        wt.revert(['a', 'b'], rev_tree, backups=False)
 
438
        self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
 
439
 
 
440
        self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
 
441
        self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
 
442
 
 
443
        # Now remove them again, and make sure that after a
 
444
        # commit, they are still marked correctly
 
445
        os.remove('b1/a')
 
446
        os.remove('b1/b')
 
447
        wt.commit('removed', rev_id='r2')
 
448
 
 
449
        self.assertEqual([], [cn for cn,ie in wt.inventory.iter_entries()])
 
450
        self.failIf(wt.has_id(a_id))
 
451
        self.failIf(wt.has_filename('a'))
 
452
        self.failIf(wt.has_id(b_id))
 
453
        self.failIf(wt.has_filename('b'))
 
454
 
 
455
        # Now revert back to the previous commit
 
456
        wt.revert([], rev_tree, backups=False)
 
457
        self.assertEqual(['a', 'b'], [cn for cn,ie in wt.inventory.iter_entries()])
 
458
 
 
459
        self.failUnless(wt.is_executable(a_id), "'a' lost the execute bit")
 
460
        self.failIf(wt.is_executable(b_id), "'b' gained an execute bit")
 
461
 
 
462
        # Now make sure that 'bzr branch' also preserves the
 
463
        # executable bit
 
464
        # TODO: Maybe this should be a blackbox test
 
465
        d2 = b.bzrdir.clone('b2', revision_id='r1')
 
466
        t2 = d2.open_workingtree()
 
467
        b2 = t2.branch
 
468
        self.assertEquals('r1', b2.last_revision())
 
469
 
 
470
        self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
 
471
        self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
 
472
        self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
 
473
 
 
474
        # Make sure pull will delete the files
 
475
        t2.pull(b)
 
476
        self.assertEquals('r2', b2.last_revision())
 
477
        self.assertEqual([], [cn for cn,ie in t2.inventory.iter_entries()])
 
478
 
 
479
        # Now commit the changes on the first branch
 
480
        # so that the second branch can pull the changes
 
481
        # and make sure that the executable bit has been copied
 
482
        wt.commit('resurrected', rev_id='r3')
 
483
 
 
484
        t2.pull(b)
 
485
        self.assertEquals('r3', b2.last_revision())
 
486
        self.assertEqual(['a', 'b'], [cn for cn,ie in t2.inventory.iter_entries()])
 
487
 
 
488
        self.failUnless(t2.is_executable(a_id), "'a' lost the execute bit")
 
489
        self.failIf(t2.is_executable(b_id), "'b' gained an execute bit")
 
490
 
 
491
class TestRevert(TestCaseWithTransport):
 
492
    def test_dangling_id(self):
 
493
        wt = self.make_branch_and_tree('b1')
 
494
        self.assertEqual(len(wt.inventory), 1)
 
495
        open('b1/a', 'wb').write('a test\n')
 
496
        wt.add('a')
 
497
        self.assertEqual(len(wt.inventory), 2)
 
498
        os.unlink('b1/a')
 
499
        wt.revert([])
 
500
        self.assertEqual(len(wt.inventory), 1)
 
501
 
 
502