~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

  • Committer: Tarmac
  • Author(s): Vincent Ladeuil
  • Date: 2017-01-30 14:42:05 UTC
  • mfrom: (6620.1.1 trunk)
  • Revision ID: tarmac-20170130144205-r8fh2xpmiuxyozpv
Merge  2.7 into trunk including fix for bug #1657238 [r=vila]

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2005-2012, 2016 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
 
18
18
from bzrlib import (
19
19
    chk_map,
20
 
    bzrdir,
 
20
    groupcompress,
21
21
    errors,
22
22
    inventory,
23
23
    osutils,
24
24
    repository,
25
25
    revision,
26
 
    )
27
 
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
28
 
    InventoryDirectory, InventoryEntry, TreeReference)
 
26
    tests,
 
27
    workingtree,
 
28
    )
 
29
from bzrlib.inventory import (
 
30
    CHKInventory,
 
31
    Inventory,
 
32
    ROOT_ID,
 
33
    InventoryFile,
 
34
    InventoryDirectory,
 
35
    InventoryEntry,
 
36
    TreeReference,
 
37
    mutable_inventory_from_tree,
 
38
    )
29
39
from bzrlib.tests import (
30
40
    TestCase,
31
41
    TestCaseWithTransport,
32
 
    condition_isinstance,
33
 
    multiply_tests,
34
 
    split_suite_by_condition,
35
42
    )
36
 
from bzrlib.tests.per_workingtree import workingtree_formats
37
 
 
38
 
 
39
 
def load_tests(standard_tests, module, loader):
40
 
    """Parameterise some inventory tests."""
41
 
    to_adapt, result = split_suite_by_condition(standard_tests,
42
 
        condition_isinstance(TestDeltaApplication))
 
43
from bzrlib.tests.scenarios import load_tests_apply_scenarios
 
44
 
 
45
 
 
46
load_tests = load_tests_apply_scenarios
 
47
 
 
48
 
 
49
def delta_application_scenarios():
43
50
    scenarios = [
44
51
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
45
52
        ]
50
57
    # just creating trees.
51
58
    formats = set()
52
59
    for _, format in repository.format_registry.iteritems():
53
 
        scenarios.append((str(format.__name__), {
54
 
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
55
 
            'format':format}))
56
 
    for format in workingtree_formats():
 
60
        if format.supports_full_versioned_files:
 
61
            scenarios.append((str(format.__name__), {
 
62
                'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
63
                'format':format}))
 
64
    for format in workingtree.format_registry._get_all():
 
65
        repo_fmt = format._matchingbzrdir.repository_format
 
66
        if not repo_fmt.supports_full_versioned_files:
 
67
            continue
57
68
        scenarios.append(
58
69
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
59
70
            'apply_delta':apply_inventory_WT_basis,
62
73
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
63
74
            'apply_delta':apply_inventory_WT,
64
75
            'format':format}))
65
 
    return multiply_tests(to_adapt, scenarios, result)
66
 
 
67
 
 
68
 
def apply_inventory_Inventory(self, basis, delta):
 
76
    return scenarios
 
77
 
 
78
 
 
79
def create_texts_for_inv(repo, inv):
 
80
    for path, ie in inv.iter_entries():
 
81
        if ie.text_size:
 
82
            lines = ['a' * ie.text_size]
 
83
        else:
 
84
            lines = []
 
85
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
86
 
 
87
 
 
88
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
69
89
    """Apply delta to basis and return the result.
70
 
    
 
90
 
71
91
    :param basis: An inventory to be used as the basis.
72
92
    :param delta: The inventory delta to apply:
73
93
    :return: An inventory resulting from the application.
76
96
    return basis
77
97
 
78
98
 
79
 
def apply_inventory_WT(self, basis, delta):
 
99
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
80
100
    """Apply delta to basis and return the result.
81
101
 
82
102
    This sets the tree state to be basis, and then calls apply_inventory_delta.
83
 
    
 
103
 
84
104
    :param basis: An inventory to be used as the basis.
85
105
    :param delta: The inventory delta to apply:
86
106
    :return: An inventory resulting from the application.
105
125
    tree = tree.bzrdir.open_workingtree()
106
126
    tree.lock_read()
107
127
    self.addCleanup(tree.unlock)
108
 
    # One could add 'tree._validate' here but that would cause 'early' failues 
109
 
    # as far as higher level code is concerned. Possibly adding an
110
 
    # expect_fail parameter to this function and if that is False then do a
111
 
    # validate call.
112
 
    return tree.inventory
113
 
 
114
 
 
115
 
def apply_inventory_WT_basis(self, basis, delta):
 
128
    if not invalid_delta:
 
129
        tree._validate()
 
130
    return tree.root_inventory
 
131
 
 
132
 
 
133
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
134
    repo.start_write_group()
 
135
    try:
 
136
        rev = revision.Revision('basis', timestamp=0, timezone=None,
 
137
            message="", committer="foo@example.com")
 
138
        basis.revision_id = 'basis'
 
139
        create_texts_for_inv(repo, basis)
 
140
        repo.add_revision('basis', rev, basis)
 
141
        if invalid_delta:
 
142
            # We don't want to apply the delta to the basis, because we expect
 
143
            # the delta is invalid.
 
144
            result_inv = basis
 
145
            result_inv.revision_id = 'result'
 
146
            target_entries = None
 
147
        else:
 
148
            result_inv = basis.create_by_apply_delta(delta, 'result')
 
149
            create_texts_for_inv(repo, result_inv)
 
150
            target_entries = list(result_inv.iter_entries_by_dir())
 
151
        rev = revision.Revision('result', timestamp=0, timezone=None,
 
152
            message="", committer="foo@example.com")
 
153
        repo.add_revision('result', rev, result_inv)
 
154
        repo.commit_write_group()
 
155
    except:
 
156
        repo.abort_write_group()
 
157
        raise
 
158
    return target_entries
 
159
 
 
160
 
 
161
def _get_basis_entries(tree):
 
162
    basis_tree = tree.basis_tree()
 
163
    basis_tree.lock_read()
 
164
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
 
165
    basis_tree.unlock()
 
166
    return basis_tree_entries
 
167
 
 
168
 
 
169
def _populate_different_tree(tree, basis, delta):
 
170
    """Put all entries into tree, but at a unique location."""
 
171
    added_ids = set()
 
172
    added_paths = set()
 
173
    tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
 
174
    for path, ie in basis.iter_entries_by_dir():
 
175
        if ie.file_id in added_ids:
 
176
            continue
 
177
        # We want a unique path for each of these, we use the file-id
 
178
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
179
        added_ids.add(ie.file_id)
 
180
    for old_path, new_path, file_id, ie in delta:
 
181
        if file_id in added_ids:
 
182
            continue
 
183
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
184
 
 
185
 
 
186
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
116
187
    """Apply delta to basis and return the result.
117
188
 
118
189
    This sets the parent and then calls update_basis_by_delta.
120
191
    allow safety checks made by the WT to succeed, and finally ensures that all
121
192
    items in the delta with a new path are present in the WT before calling
122
193
    update_basis_by_delta.
123
 
    
 
194
 
124
195
    :param basis: An inventory to be used as the basis.
125
196
    :param delta: The inventory delta to apply:
126
197
    :return: An inventory resulting from the application.
127
198
    """
128
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
199
    control = test.make_bzrdir('tree', format=test.format._matchingbzrdir)
129
200
    control.create_repository()
130
201
    control.create_branch()
131
 
    tree = self.format.initialize(control)
 
202
    tree = test.format.initialize(control)
132
203
    tree.lock_write()
133
204
    try:
134
 
        repo = tree.branch.repository
135
 
        repo.start_write_group()
136
 
        try:
137
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
138
 
                message="", committer="foo@example.com")
139
 
            basis.revision_id = 'basis'
140
 
            repo.add_revision('basis', rev, basis)
141
 
            # Add a revision for the result, with the basis content - 
142
 
            # update_basis_by_delta doesn't check that the delta results in
143
 
            # result, and we want inconsistent deltas to get called on the
144
 
            # tree, or else the code isn't actually checked.
145
 
            rev = revision.Revision('result', timestamp=0, timezone=None,
146
 
                message="", committer="foo@example.com")
147
 
            basis.revision_id = 'result'
148
 
            repo.add_revision('result', rev, basis)
149
 
        except:
150
 
            repo.abort_write_group()
151
 
            raise
152
 
        else:
153
 
            repo.commit_write_group()
 
205
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
206
                                                delta, invalid_delta)
154
207
        # Set the basis state as the trees current state
155
208
        tree._write_inventory(basis)
156
209
        # This reads basis from the repo and puts it into the tree's local
157
210
        # cache, if it has one.
158
211
        tree.set_parent_ids(['basis'])
159
 
        paths = {}
160
 
        parents = set()
161
 
        for old, new, id, entry in delta:
162
 
            if None in (new, entry):
163
 
                continue
164
 
            paths[new] = (entry.file_id, entry.kind)
165
 
            parents.add(osutils.dirname(new))
166
 
        parents = osutils.minimum_path_selection(parents)
167
 
        parents.discard('')
168
 
        # Put place holders in the tree to permit adding the other entries.
169
 
        for pos, parent in enumerate(parents):
170
 
            if not tree.path2id(parent):
171
 
                # add a synthetic directory in the tree so we can can put the
172
 
                # tree0 entries in place for dirstate.
173
 
                tree.add([parent], ["id%d" % pos], ["directory"])
174
 
        if paths:
175
 
            # Many deltas may cause this mini-apply to fail, but we want to see what
176
 
            # the delta application code says, not the prep that we do to deal with 
177
 
            # limitations of dirstate's update_basis code.
178
 
            for path, (file_id, kind) in sorted(paths.items()):
179
 
                try:
180
 
                    tree.add([path], [file_id], [kind])
181
 
                except (KeyboardInterrupt, SystemExit):
182
 
                    raise
183
 
                except:
184
 
                    pass
185
212
    finally:
186
213
        tree.unlock()
187
214
    # Fresh lock, reads disk again.
188
215
    tree.lock_write()
189
216
    try:
190
217
        tree.update_basis_by_delta('result', delta)
 
218
        if not invalid_delta:
 
219
            tree._validate()
191
220
    finally:
192
221
        tree.unlock()
193
222
    # reload tree - ensure we get what was written.
194
223
    tree = tree.bzrdir.open_workingtree()
195
224
    basis_tree = tree.basis_tree()
196
225
    basis_tree.lock_read()
197
 
    self.addCleanup(basis_tree.unlock)
198
 
    # Note, that if the tree does not have a local cache, the trick above of
199
 
    # setting the result as the basis, will come back to bite us. That said,
200
 
    # all the implementations in bzr do have a local cache.
201
 
    return basis_tree.inventory
202
 
 
203
 
 
204
 
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
 
226
    test.addCleanup(basis_tree.unlock)
 
227
    basis_inv = basis_tree.root_inventory
 
228
    if target_entries:
 
229
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
230
        test.assertEqual(target_entries, basis_entries)
 
231
    return basis_inv
 
232
 
 
233
 
 
234
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
235
                                                      invalid_delta=True):
205
236
    """Apply delta to basis and return the result.
206
237
    
207
238
    This inserts basis as a whole inventory and then uses
221
252
            rev = revision.Revision('basis', timestamp=0, timezone=None,
222
253
                message="", committer="foo@example.com")
223
254
            basis.revision_id = 'basis'
 
255
            create_texts_for_inv(repo, basis)
224
256
            repo.add_revision('basis', rev, basis)
 
257
            repo.commit_write_group()
225
258
        except:
226
259
            repo.abort_write_group()
227
260
            raise
228
 
        else:
229
 
            repo.commit_write_group()
230
261
    finally:
231
262
        repo.unlock()
232
263
    repo.lock_write()
249
280
    return repo.get_inventory('result')
250
281
 
251
282
 
 
283
class TestInventoryUpdates(TestCase):
 
284
 
 
285
    def test_creation_from_root_id(self):
 
286
        # iff a root id is passed to the constructor, a root directory is made
 
287
        inv = inventory.Inventory(root_id='tree-root')
 
288
        self.assertNotEqual(None, inv.root)
 
289
        self.assertEqual('tree-root', inv.root.file_id)
 
290
 
 
291
    def test_add_path_of_root(self):
 
292
        # if no root id is given at creation time, there is no root directory
 
293
        inv = inventory.Inventory(root_id=None)
 
294
        self.assertIs(None, inv.root)
 
295
        # add a root entry by adding its path
 
296
        ie = inv.add_path("", "directory", "my-root")
 
297
        ie.revision = 'test-rev'
 
298
        self.assertEqual("my-root", ie.file_id)
 
299
        self.assertIs(ie, inv.root)
 
300
 
 
301
    def test_add_path(self):
 
302
        inv = inventory.Inventory(root_id='tree_root')
 
303
        ie = inv.add_path('hello', 'file', 'hello-id')
 
304
        self.assertEqual('hello-id', ie.file_id)
 
305
        self.assertEqual('file', ie.kind)
 
306
 
 
307
    def test_copy(self):
 
308
        """Make sure copy() works and creates a deep copy."""
 
309
        inv = inventory.Inventory(root_id='some-tree-root')
 
310
        ie = inv.add_path('hello', 'file', 'hello-id')
 
311
        inv2 = inv.copy()
 
312
        inv.root.file_id = 'some-new-root'
 
313
        ie.name = 'file2'
 
314
        self.assertEqual('some-tree-root', inv2.root.file_id)
 
315
        self.assertEqual('hello', inv2['hello-id'].name)
 
316
 
 
317
    def test_copy_empty(self):
 
318
        """Make sure an empty inventory can be copied."""
 
319
        inv = inventory.Inventory(root_id=None)
 
320
        inv2 = inv.copy()
 
321
        self.assertIs(None, inv2.root)
 
322
 
 
323
    def test_copy_copies_root_revision(self):
 
324
        """Make sure the revision of the root gets copied."""
 
325
        inv = inventory.Inventory(root_id='someroot')
 
326
        inv.root.revision = 'therev'
 
327
        inv2 = inv.copy()
 
328
        self.assertEqual('someroot', inv2.root.file_id)
 
329
        self.assertEqual('therev', inv2.root.revision)
 
330
 
 
331
    def test_create_tree_reference(self):
 
332
        inv = inventory.Inventory('tree-root-123')
 
333
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
 
334
                              revision='rev', reference_revision='rev2'))
 
335
 
 
336
    def test_error_encoding(self):
 
337
        inv = inventory.Inventory('tree-root')
 
338
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
 
339
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
340
            InventoryFile('b-id', u'\u1234', 'tree-root'))
 
341
        self.assertContainsRe(str(e), r'\\u1234')
 
342
 
 
343
    def test_add_recursive(self):
 
344
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
 
345
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
 
346
        parent.children[child.file_id] = child
 
347
        inv = inventory.Inventory('tree-root')
 
348
        inv.add(parent)
 
349
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
 
350
 
 
351
 
 
352
 
252
353
class TestDeltaApplication(TestCaseWithTransport):
 
354
 
 
355
    scenarios = delta_application_scenarios()
253
356
 
254
357
    def get_empty_inventory(self, reference_inv=None):
255
358
        """Get an empty inventory.
270
373
            inv.root.revision = 'basis'
271
374
        return inv
272
375
 
 
376
    def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
 
377
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
378
        ie_file.revision = 'result'
 
379
        ie_file.text_size = 0
 
380
        ie_file.text_sha1 = ''
 
381
        return ie_file
 
382
 
273
383
    def test_empty_delta(self):
274
384
        inv = self.get_empty_inventory()
275
385
        delta = []
299
409
        file1.revision = 'result'
300
410
        file1.text_size = 0
301
411
        file1.text_sha1 = ""
302
 
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
303
 
        file2.revision = 'result'
304
 
        file2.text_size = 0
305
 
        file2.text_sha1 = ""
 
412
        file2 = file1.copy()
 
413
        file2.name = 'path2'
306
414
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
307
415
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
308
416
            inv, delta)
313
421
        file1.revision = 'result'
314
422
        file1.text_size = 0
315
423
        file1.text_sha1 = ""
316
 
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
317
 
        file2.revision = 'result'
318
 
        file2.text_size = 0
319
 
        file2.text_sha1 = ""
 
424
        file2 = file1.copy()
 
425
        file2.file_id = 'id2'
320
426
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
321
427
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
322
428
            inv, delta)
494
600
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
495
601
            inv, delta)
496
602
 
 
603
    def test_add_file(self):
 
604
        inv = self.get_empty_inventory()
 
605
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
606
        file1.revision = 'result'
 
607
        file1.text_size = 0
 
608
        file1.text_sha1 = ''
 
609
        delta = [(None, u'path', 'file-id', file1)]
 
610
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
611
        self.assertEqual('file-id', res_inv['file-id'].file_id)
 
612
 
 
613
    def test_remove_file(self):
 
614
        inv = self.get_empty_inventory()
 
615
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
616
        file1.revision = 'result'
 
617
        file1.text_size = 0
 
618
        file1.text_sha1 = ''
 
619
        inv.add(file1)
 
620
        delta = [(u'path', None, 'file-id', None)]
 
621
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
622
        self.assertEqual(None, res_inv.path2id('path'))
 
623
        self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
 
624
 
 
625
    def test_rename_file(self):
 
626
        inv = self.get_empty_inventory()
 
627
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
628
        inv.add(file1)
 
629
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
630
        delta = [(u'path', 'path2', 'file-id', file2)]
 
631
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
632
        self.assertEqual(None, res_inv.path2id('path'))
 
633
        self.assertEqual('file-id', res_inv.path2id('path2'))
 
634
 
 
635
    def test_replaced_at_new_path(self):
 
636
        inv = self.get_empty_inventory()
 
637
        file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
 
638
        inv.add(file1)
 
639
        file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
 
640
        delta = [(u'name', None, 'id1', None),
 
641
                 (None, u'name', 'id2', file2)]
 
642
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
643
        self.assertEqual('id2', res_inv.path2id('name'))
 
644
 
 
645
    def test_rename_dir(self):
 
646
        inv = self.get_empty_inventory()
 
647
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
648
        dir1.revision = 'basis'
 
649
        file1 = self.make_file_ie(parent_id='dir-id')
 
650
        inv.add(dir1)
 
651
        inv.add(file1)
 
652
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
653
        dir2.revision = 'result'
 
654
        delta = [('dir1', 'dir2', 'dir-id', dir2)]
 
655
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
656
        # The file should be accessible under the new path
 
657
        self.assertEqual('file-id', res_inv.path2id('dir2/name'))
 
658
 
 
659
    def test_renamed_dir_with_renamed_child(self):
 
660
        inv = self.get_empty_inventory()
 
661
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
662
        dir1.revision = 'basis'
 
663
        file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
 
664
        file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
 
665
        inv.add(dir1)
 
666
        inv.add(file1)
 
667
        inv.add(file2)
 
668
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
669
        dir2.revision = 'result'
 
670
        file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
 
671
        delta = [('dir1', 'dir2', 'dir-id', dir2),
 
672
                 ('dir1/name2', 'name2', 'file-id-2', file2b)]
 
673
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
674
        # The file should be accessible under the new path
 
675
        self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
 
676
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
677
        self.assertEqual('file-id-2', res_inv.path2id('name2'))
 
678
 
 
679
    def test_is_root(self):
 
680
        """Ensure our root-checking code is accurate."""
 
681
        inv = inventory.Inventory('TREE_ROOT')
 
682
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
683
        self.assertFalse(inv.is_root('booga'))
 
684
        inv.root.file_id = 'booga'
 
685
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
686
        self.assertTrue(inv.is_root('booga'))
 
687
        # works properly even if no root is set
 
688
        inv.root = None
 
689
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
690
        self.assertFalse(inv.is_root('booga'))
 
691
 
 
692
    def test_entries_for_empty_inventory(self):
 
693
        """Test that entries() will not fail for an empty inventory"""
 
694
        inv = Inventory(root_id=None)
 
695
        self.assertEqual([], inv.entries())
 
696
 
497
697
 
498
698
class TestInventoryEntry(TestCase):
499
699
 
511
711
 
512
712
    def test_dir_detect_changes(self):
513
713
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
514
 
        left.text_sha1 = 123
515
 
        left.executable = True
516
 
        left.symlink_target='foo'
517
714
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
518
 
        right.text_sha1 = 321
519
 
        right.symlink_target='bar'
520
715
        self.assertEqual((False, False), left.detect_changes(right))
521
716
        self.assertEqual((False, False), right.detect_changes(left))
522
717
 
536
731
 
537
732
    def test_symlink_detect_changes(self):
538
733
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
539
 
        left.text_sha1 = 123
540
 
        left.executable = True
541
734
        left.symlink_target='foo'
542
735
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
543
 
        right.text_sha1 = 321
544
736
        right.symlink_target='foo'
545
737
        self.assertEqual((False, False), left.detect_changes(right))
546
738
        self.assertEqual((False, False), right.detect_changes(left))
550
742
 
551
743
    def test_file_has_text(self):
552
744
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
553
 
        self.failUnless(file.has_text())
 
745
        self.assertTrue(file.has_text())
554
746
 
555
747
    def test_directory_has_text(self):
556
748
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
557
 
        self.failIf(dir.has_text())
 
749
        self.assertFalse(dir.has_text())
558
750
 
559
751
    def test_link_has_text(self):
560
752
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
561
 
        self.failIf(link.has_text())
 
753
        self.assertFalse(link.has_text())
562
754
 
563
755
    def test_make_entry(self):
564
756
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
642
834
        self.assertEqual(expected_change, change)
643
835
 
644
836
 
645
 
class TestCHKInventory(TestCaseWithTransport):
 
837
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
646
838
 
647
839
    def get_chk_bytes(self):
648
 
        # The easiest way to get a CHK store is a development6 repository and
649
 
        # then work with the chk_bytes attribute directly.
650
 
        repo = self.make_repository(".", format="development6-rich-root")
651
 
        repo.lock_write()
652
 
        self.addCleanup(repo.unlock)
653
 
        repo.start_write_group()
654
 
        self.addCleanup(repo.abort_write_group)
655
 
        return repo.chk_bytes
 
840
        factory = groupcompress.make_pack_factory(True, True, 1)
 
841
        trans = self.get_transport('')
 
842
        return factory(trans)
656
843
 
657
844
    def read_bytes(self, chk_bytes, key):
658
845
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
914
1101
        delta = [("", None, base_inv.root.file_id, None),
915
1102
            (None, "",  "myrootid", inv.root)]
916
1103
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
917
 
        self.assertEquals(reference_inv.root, new_inv.root)
 
1104
        self.assertEqual(reference_inv.root, new_inv.root)
918
1105
 
919
1106
    def test_create_by_apply_delta_empty_add_child(self):
920
1107
        inv = Inventory()
1123
1310
        self.assertIsInstance(ie2.name, unicode)
1124
1311
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1125
1312
                         inv._bytes_to_utf8name_key(bytes))
 
1313
 
 
1314
    def make_basic_utf8_inventory(self):
 
1315
        inv = Inventory()
 
1316
        inv.revision_id = "revid"
 
1317
        inv.root.revision = "rootrev"
 
1318
        root_id = inv.root.file_id
 
1319
        inv.add(InventoryFile("fileid", u'f\xefle', root_id))
 
1320
        inv["fileid"].revision = "filerev"
 
1321
        inv["fileid"].text_sha1 = "ffff"
 
1322
        inv["fileid"].text_size = 0
 
1323
        inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
 
1324
        inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
 
1325
        inv["childid"].revision = "filerev"
 
1326
        inv["childid"].text_sha1 = "ffff"
 
1327
        inv["childid"].text_size = 0
 
1328
        chk_bytes = self.get_chk_bytes()
 
1329
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1330
        bytes = ''.join(chk_inv.to_lines())
 
1331
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1332
 
 
1333
    def test__preload_handles_utf8(self):
 
1334
        new_inv = self.make_basic_utf8_inventory()
 
1335
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1336
        self.assertFalse(new_inv._fully_cached)
 
1337
        new_inv._preload_cache()
 
1338
        self.assertEqual(
 
1339
            sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
 
1340
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1341
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
 
1342
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1343
                         sorted(ie_root._children.keys()))
 
1344
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1345
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
 
1346
 
 
1347
    def test__preload_populates_cache(self):
 
1348
        inv = Inventory()
 
1349
        inv.revision_id = "revid"
 
1350
        inv.root.revision = "rootrev"
 
1351
        root_id = inv.root.file_id
 
1352
        inv.add(InventoryFile("fileid", "file", root_id))
 
1353
        inv["fileid"].revision = "filerev"
 
1354
        inv["fileid"].executable = True
 
1355
        inv["fileid"].text_sha1 = "ffff"
 
1356
        inv["fileid"].text_size = 1
 
1357
        inv.add(InventoryDirectory("dirid", "dir", root_id))
 
1358
        inv.add(InventoryFile("childid", "child", "dirid"))
 
1359
        inv["childid"].revision = "filerev"
 
1360
        inv["childid"].executable = False
 
1361
        inv["childid"].text_sha1 = "dddd"
 
1362
        inv["childid"].text_size = 1
 
1363
        chk_bytes = self.get_chk_bytes()
 
1364
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1365
        bytes = ''.join(chk_inv.to_lines())
 
1366
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1367
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
 
1368
        self.assertFalse(new_inv._fully_cached)
 
1369
        new_inv._preload_cache()
 
1370
        self.assertEqual(
 
1371
            sorted([root_id, "fileid", "dirid", "childid"]),
 
1372
            sorted(new_inv._fileid_to_entry_cache.keys()))
 
1373
        self.assertTrue(new_inv._fully_cached)
 
1374
        ie_root = new_inv._fileid_to_entry_cache[root_id]
 
1375
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
 
1376
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
 
1377
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
 
1378
 
 
1379
    def test__preload_handles_partially_evaluated_inventory(self):
 
1380
        new_inv = self.make_basic_utf8_inventory()
 
1381
        ie = new_inv[new_inv.root_id]
 
1382
        self.assertIs(None, ie._children)
 
1383
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1384
                         sorted(ie.children.keys()))
 
1385
        # Accessing .children loads _children
 
1386
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1387
                         sorted(ie._children.keys()))
 
1388
        new_inv._preload_cache()
 
1389
        # No change
 
1390
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
 
1391
                         sorted(ie._children.keys()))
 
1392
        ie_dir = new_inv["dirid"]
 
1393
        self.assertEqual([u'ch\xefld'],
 
1394
                         sorted(ie_dir._children.keys()))
 
1395
 
 
1396
    def test_filter_change_in_renamed_subfolder(self):
 
1397
        inv = Inventory('tree-root')
 
1398
        src_ie = inv.add_path('src', 'directory', 'src-id')
 
1399
        inv.add_path('src/sub/', 'directory', 'sub-id')
 
1400
        a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
 
1401
        a_ie.text_sha1 = osutils.sha_string('content\n')
 
1402
        a_ie.text_size = len('content\n')
 
1403
        chk_bytes = self.get_chk_bytes()
 
1404
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1405
        inv = inv.create_by_apply_delta([
 
1406
            ("src/sub/a", "src/sub/a", "a-id", a_ie),
 
1407
            ("src", "src2", "src-id", src_ie),
 
1408
            ], 'new-rev-2')
 
1409
        new_inv = inv.filter(['a-id', 'src-id'])
 
1410
        self.assertEqual([
 
1411
            ('', 'tree-root'),
 
1412
            ('src', 'src-id'),
 
1413
            ('src/sub', 'sub-id'),
 
1414
            ('src/sub/a', 'a-id'),
 
1415
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
 
1416
 
 
1417
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1418
 
 
1419
    def get_chk_bytes(self):
 
1420
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1421
        trans = self.get_transport('')
 
1422
        return factory(trans)
 
1423
 
 
1424
    def make_dir(self, inv, name, parent_id):
 
1425
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
 
1426
 
 
1427
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1428
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1429
        ie.text_sha1 = osutils.sha_string(content)
 
1430
        ie.text_size = len(content)
 
1431
        inv.add(ie)
 
1432
 
 
1433
    def make_simple_inventory(self):
 
1434
        inv = Inventory('TREE_ROOT')
 
1435
        inv.revision_id = "revid"
 
1436
        inv.root.revision = "rootrev"
 
1437
        # /                 TREE_ROOT
 
1438
        # dir1/             dir1-id
 
1439
        #   sub-file1       sub-file1-id
 
1440
        #   sub-file2       sub-file2-id
 
1441
        #   sub-dir1/       sub-dir1-id
 
1442
        #     subsub-file1  subsub-file1-id
 
1443
        # dir2/             dir2-id
 
1444
        #   sub2-file1      sub2-file1-id
 
1445
        # top               top-id
 
1446
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1447
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1448
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1449
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1450
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1451
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1452
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1453
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1454
        chk_bytes = self.get_chk_bytes()
 
1455
        #  use a small maximum_size to force internal paging structures
 
1456
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1457
                        maximum_size=100,
 
1458
                        search_key_name='hash-255-way')
 
1459
        bytes = ''.join(chk_inv.to_lines())
 
1460
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1461
 
 
1462
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1463
        self.assertEqual(sorted(expected_fileids),
 
1464
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1465
 
 
1466
    def assertExpand(self, all_ids, inv, file_ids):
 
1467
        (val_all_ids,
 
1468
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1469
        self.assertEqual(set(all_ids), val_all_ids)
 
1470
        entries = inv._getitems(val_all_ids)
 
1471
        expected_children = {}
 
1472
        for entry in entries:
 
1473
            s = expected_children.setdefault(entry.parent_id, [])
 
1474
            s.append(entry.file_id)
 
1475
        val_children = dict((k, sorted(v)) for k, v
 
1476
                            in val_children.iteritems())
 
1477
        expected_children = dict((k, sorted(v)) for k, v
 
1478
                            in expected_children.iteritems())
 
1479
        self.assertEqual(expected_children, val_children)
 
1480
 
 
1481
    def test_make_simple_inventory(self):
 
1482
        inv = self.make_simple_inventory()
 
1483
        layout = []
 
1484
        for path, entry in inv.iter_entries_by_dir():
 
1485
            layout.append((path, entry.file_id))
 
1486
        self.assertEqual([
 
1487
            ('', 'TREE_ROOT'),
 
1488
            ('dir1', 'dir1-id'),
 
1489
            ('dir2', 'dir2-id'),
 
1490
            ('top', 'top-id'),
 
1491
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1492
            ('dir1/sub-file1', 'sub-file1-id'),
 
1493
            ('dir1/sub-file2', 'sub-file2-id'),
 
1494
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1495
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1496
            ], layout)
 
1497
 
 
1498
    def test__getitems(self):
 
1499
        inv = self.make_simple_inventory()
 
1500
        # Reading from disk
 
1501
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1502
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1503
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1504
        # From cache
 
1505
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1506
        # Mixed
 
1507
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1508
                             ['dir1-id', 'sub-file2-id'])
 
1509
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1510
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1511
 
 
1512
    def test_single_file(self):
 
1513
        inv = self.make_simple_inventory()
 
1514
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1515
 
 
1516
    def test_get_all_parents(self):
 
1517
        inv = self.make_simple_inventory()
 
1518
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1519
                           'subsub-file1-id',
 
1520
                          ], inv, ['subsub-file1-id'])
 
1521
 
 
1522
    def test_get_children(self):
 
1523
        inv = self.make_simple_inventory()
 
1524
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1525
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1526
                          ], inv, ['dir1-id'])
 
1527
 
 
1528
    def test_from_root(self):
 
1529
        inv = self.make_simple_inventory()
 
1530
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1531
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1532
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1533
 
 
1534
    def test_top_level_file(self):
 
1535
        inv = self.make_simple_inventory()
 
1536
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1537
 
 
1538
    def test_subsub_file(self):
 
1539
        inv = self.make_simple_inventory()
 
1540
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1541
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1542
 
 
1543
    def test_sub_and_root(self):
 
1544
        inv = self.make_simple_inventory()
 
1545
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1546
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
 
1547
 
 
1548
 
 
1549
class TestMutableInventoryFromTree(TestCaseWithTransport):
 
1550
 
 
1551
    def test_empty(self):
 
1552
        repository = self.make_repository('.')
 
1553
        tree = repository.revision_tree(revision.NULL_REVISION)
 
1554
        inv = mutable_inventory_from_tree(tree)
 
1555
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
 
1556
        self.assertEqual(0, len(inv))
 
1557
 
 
1558
    def test_some_files(self):
 
1559
        wt = self.make_branch_and_tree('.')
 
1560
        self.build_tree(['a'])
 
1561
        wt.add(['a'], ['thefileid'])
 
1562
        revid = wt.commit("commit")
 
1563
        tree = wt.branch.repository.revision_tree(revid)
 
1564
        inv = mutable_inventory_from_tree(tree)
 
1565
        self.assertEqual(revid, inv.revision_id)
 
1566
        self.assertEqual(2, len(inv))
 
1567
        self.assertEqual("a", inv['thefileid'].name)
 
1568
        # The inventory should be mutable and independent of
 
1569
        # the original tree
 
1570
        self.assertFalse(tree.root_inventory['thefileid'].executable)
 
1571
        inv['thefileid'].executable = True
 
1572
        self.assertFalse(tree.root_inventory['thefileid'].executable)