~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

Abbreviate pack_stat struct format to '>6L'

Show diffs side-by-side

added added

removed removed

Lines of Context:
85
85
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
86
86
 
87
87
 
88
 
def apply_inventory_Inventory(self, basis, delta):
 
88
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
89
89
    """Apply delta to basis and return the result.
90
 
    
 
90
 
91
91
    :param basis: An inventory to be used as the basis.
92
92
    :param delta: The inventory delta to apply:
93
93
    :return: An inventory resulting from the application.
96
96
    return basis
97
97
 
98
98
 
99
 
def apply_inventory_WT(self, basis, delta):
 
99
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
100
100
    """Apply delta to basis and return the result.
101
101
 
102
102
    This sets the tree state to be basis, and then calls apply_inventory_delta.
103
 
    
 
103
 
104
104
    :param basis: An inventory to be used as the basis.
105
105
    :param delta: The inventory delta to apply:
106
106
    :return: An inventory resulting from the application.
125
125
    tree = tree.bzrdir.open_workingtree()
126
126
    tree.lock_read()
127
127
    self.addCleanup(tree.unlock)
128
 
    # One could add 'tree._validate' here but that would cause 'early' failues 
129
 
    # as far as higher level code is concerned. Possibly adding an
130
 
    # expect_fail parameter to this function and if that is False then do a
131
 
    # validate call.
 
128
    if not invalid_delta:
 
129
        tree._validate()
132
130
    return tree.inventory
133
131
 
134
132
 
135
 
def apply_inventory_WT_basis(self, basis, delta):
 
133
def _create_repo_revisions(repo, basis, delta, invalid_delta):
 
134
    repo.start_write_group()
 
135
    try:
 
136
        rev = revision.Revision('basis', timestamp=0, timezone=None,
 
137
            message="", committer="foo@example.com")
 
138
        basis.revision_id = 'basis'
 
139
        create_texts_for_inv(repo, basis)
 
140
        repo.add_revision('basis', rev, basis)
 
141
        if invalid_delta:
 
142
            # We don't want to apply the delta to the basis, because we expect
 
143
            # the delta is invalid.
 
144
            result_inv = basis
 
145
            result_inv.revision_id = 'result'
 
146
            target_entries = None
 
147
        else:
 
148
            result_inv = basis.create_by_apply_delta(delta, 'result')
 
149
            create_texts_for_inv(repo, result_inv)
 
150
            target_entries = list(result_inv.iter_entries_by_dir())
 
151
        rev = revision.Revision('result', timestamp=0, timezone=None,
 
152
            message="", committer="foo@example.com")
 
153
        repo.add_revision('result', rev, result_inv)
 
154
        repo.commit_write_group()
 
155
    except:
 
156
        repo.abort_write_group()
 
157
        raise
 
158
    return target_entries
 
159
 
 
160
 
 
161
def _get_basis_entries(tree):
 
162
    basis_tree = tree.basis_tree()
 
163
    basis_tree.lock_read()
 
164
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
 
165
    basis_tree.unlock()
 
166
    return basis_tree_entries
 
167
 
 
168
 
 
169
def _populate_different_tree(tree, basis, delta):
 
170
    """Put all entries into tree, but at a unique location."""
 
171
    added_ids = set()
 
172
    added_paths = set()
 
173
    tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
 
174
    for path, ie in basis.iter_entries_by_dir():
 
175
        if ie.file_id in added_ids:
 
176
            continue
 
177
        # We want a unique path for each of these, we use the file-id
 
178
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
 
179
        added_ids.add(ie.file_id)
 
180
    for old_path, new_path, file_id, ie in delta:
 
181
        if file_id in added_ids:
 
182
            continue
 
183
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
 
184
 
 
185
 
 
186
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
136
187
    """Apply delta to basis and return the result.
137
188
 
138
189
    This sets the parent and then calls update_basis_by_delta.
140
191
    allow safety checks made by the WT to succeed, and finally ensures that all
141
192
    items in the delta with a new path are present in the WT before calling
142
193
    update_basis_by_delta.
143
 
    
 
194
 
144
195
    :param basis: An inventory to be used as the basis.
145
196
    :param delta: The inventory delta to apply:
146
197
    :return: An inventory resulting from the application.
147
198
    """
148
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
199
    control = test.make_bzrdir('tree', format=test.format._matchingbzrdir)
149
200
    control.create_repository()
150
201
    control.create_branch()
151
 
    tree = self.format.initialize(control)
 
202
    tree = test.format.initialize(control)
152
203
    tree.lock_write()
153
204
    try:
154
 
        repo = tree.branch.repository
155
 
        repo.start_write_group()
156
 
        try:
157
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
158
 
                message="", committer="foo@example.com")
159
 
            basis.revision_id = 'basis'
160
 
            create_texts_for_inv(tree.branch.repository, basis)
161
 
            repo.add_revision('basis', rev, basis)
162
 
            # Add a revision for the result, with the basis content - 
163
 
            # update_basis_by_delta doesn't check that the delta results in
164
 
            # result, and we want inconsistent deltas to get called on the
165
 
            # tree, or else the code isn't actually checked.
166
 
            rev = revision.Revision('result', timestamp=0, timezone=None,
167
 
                message="", committer="foo@example.com")
168
 
            basis.revision_id = 'result'
169
 
            repo.add_revision('result', rev, basis)
170
 
            repo.commit_write_group()
171
 
        except:
172
 
            repo.abort_write_group()
173
 
            raise
 
205
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
 
206
                                                delta, invalid_delta)
174
207
        # Set the basis state as the trees current state
175
208
        tree._write_inventory(basis)
176
209
        # This reads basis from the repo and puts it into the tree's local
177
210
        # cache, if it has one.
178
211
        tree.set_parent_ids(['basis'])
179
 
        paths = {}
180
 
        parents = set()
181
 
        for old, new, id, entry in delta:
182
 
            if None in (new, entry):
183
 
                continue
184
 
            paths[new] = (entry.file_id, entry.kind)
185
 
            parents.add(osutils.dirname(new))
186
 
        parents = osutils.minimum_path_selection(parents)
187
 
        parents.discard('')
188
 
        # Put place holders in the tree to permit adding the other entries.
189
 
        for pos, parent in enumerate(parents):
190
 
            if not tree.path2id(parent):
191
 
                # add a synthetic directory in the tree so we can can put the
192
 
                # tree0 entries in place for dirstate.
193
 
                tree.add([parent], ["id%d" % pos], ["directory"])
194
 
        if paths:
195
 
            # Many deltas may cause this mini-apply to fail, but we want to see what
196
 
            # the delta application code says, not the prep that we do to deal with 
197
 
            # limitations of dirstate's update_basis code.
198
 
            for path, (file_id, kind) in sorted(paths.items()):
199
 
                try:
200
 
                    tree.add([path], [file_id], [kind])
201
 
                except (KeyboardInterrupt, SystemExit):
202
 
                    raise
203
 
                except:
204
 
                    pass
205
212
    finally:
206
213
        tree.unlock()
207
214
    # Fresh lock, reads disk again.
208
215
    tree.lock_write()
209
216
    try:
210
217
        tree.update_basis_by_delta('result', delta)
 
218
        if not invalid_delta:
 
219
            tree._validate()
211
220
    finally:
212
221
        tree.unlock()
213
222
    # reload tree - ensure we get what was written.
214
223
    tree = tree.bzrdir.open_workingtree()
215
224
    basis_tree = tree.basis_tree()
216
225
    basis_tree.lock_read()
217
 
    self.addCleanup(basis_tree.unlock)
218
 
    # Note, that if the tree does not have a local cache, the trick above of
219
 
    # setting the result as the basis, will come back to bite us. That said,
220
 
    # all the implementations in bzr do have a local cache.
221
 
    return basis_tree.inventory
222
 
 
223
 
 
224
 
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
 
226
    test.addCleanup(basis_tree.unlock)
 
227
    basis_inv = basis_tree.inventory
 
228
    if target_entries:
 
229
        basis_entries = list(basis_inv.iter_entries_by_dir())
 
230
        test.assertEqual(target_entries, basis_entries)
 
231
    return basis_inv
 
232
 
 
233
 
 
234
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
 
235
                                                      invalid_delta=True):
225
236
    """Apply delta to basis and return the result.
226
237
    
227
238
    This inserts basis as a whole inventory and then uses
362
373
            inv.root.revision = 'basis'
363
374
        return inv
364
375
 
 
376
    def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
 
377
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
 
378
        ie_file.revision = 'result'
 
379
        ie_file.text_size = 0
 
380
        ie_file.text_sha1 = ''
 
381
        return ie_file
 
382
 
365
383
    def test_empty_delta(self):
366
384
        inv = self.get_empty_inventory()
367
385
        delta = []
391
409
        file1.revision = 'result'
392
410
        file1.text_size = 0
393
411
        file1.text_sha1 = ""
394
 
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
395
 
        file2.revision = 'result'
396
 
        file2.text_size = 0
397
 
        file2.text_sha1 = ""
 
412
        file2 = file1.copy()
 
413
        file2.name = 'path2'
398
414
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
399
415
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
400
416
            inv, delta)
405
421
        file1.revision = 'result'
406
422
        file1.text_size = 0
407
423
        file1.text_sha1 = ""
408
 
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
409
 
        file2.revision = 'result'
410
 
        file2.text_size = 0
411
 
        file2.text_sha1 = ""
 
424
        file2 = file1.copy()
 
425
        file2.file_id = 'id2'
412
426
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
413
427
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
414
428
            inv, delta)
586
600
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
587
601
            inv, delta)
588
602
 
589
 
 
590
 
class TestInventory(TestCase):
 
603
    def test_add_file(self):
 
604
        inv = self.get_empty_inventory()
 
605
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
606
        file1.revision = 'result'
 
607
        file1.text_size = 0
 
608
        file1.text_sha1 = ''
 
609
        delta = [(None, u'path', 'file-id', file1)]
 
610
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
611
        self.assertEqual('file-id', res_inv['file-id'].file_id)
 
612
 
 
613
    def test_remove_file(self):
 
614
        inv = self.get_empty_inventory()
 
615
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
 
616
        file1.revision = 'result'
 
617
        file1.text_size = 0
 
618
        file1.text_sha1 = ''
 
619
        inv.add(file1)
 
620
        delta = [(u'path', None, 'file-id', None)]
 
621
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
622
        self.assertEqual(None, res_inv.path2id('path'))
 
623
        self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
 
624
 
 
625
    def test_rename_file(self):
 
626
        inv = self.get_empty_inventory()
 
627
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
 
628
        inv.add(file1)
 
629
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
 
630
        delta = [(u'path', 'path2', 'file-id', file2)]
 
631
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
632
        self.assertEqual(None, res_inv.path2id('path'))
 
633
        self.assertEqual('file-id', res_inv.path2id('path2'))
 
634
 
 
635
    def test_replaced_at_new_path(self):
 
636
        inv = self.get_empty_inventory()
 
637
        file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
 
638
        inv.add(file1)
 
639
        file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
 
640
        delta = [(u'name', None, 'id1', None),
 
641
                 (None, u'name', 'id2', file2)]
 
642
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
643
        self.assertEqual('id2', res_inv.path2id('name'))
 
644
 
 
645
    def test_rename_dir(self):
 
646
        inv = self.get_empty_inventory()
 
647
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
648
        dir1.revision = 'basis'
 
649
        file1 = self.make_file_ie(parent_id='dir-id')
 
650
        inv.add(dir1)
 
651
        inv.add(file1)
 
652
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
653
        dir2.revision = 'result'
 
654
        delta = [('dir1', 'dir2', 'dir-id', dir2)]
 
655
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
656
        # The file should be accessible under the new path
 
657
        self.assertEqual('file-id', res_inv.path2id('dir2/name'))
 
658
 
 
659
    def test_renamed_dir_with_renamed_child(self):
 
660
        inv = self.get_empty_inventory()
 
661
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
 
662
        dir1.revision = 'basis'
 
663
        file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
 
664
        file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
 
665
        inv.add(dir1)
 
666
        inv.add(file1)
 
667
        inv.add(file2)
 
668
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
 
669
        dir2.revision = 'result'
 
670
        file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
 
671
        delta = [('dir1', 'dir2', 'dir-id', dir2),
 
672
                 ('dir1/name2', 'name2', 'file-id-2', file2b)]
 
673
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
 
674
        # The file should be accessible under the new path
 
675
        self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
 
676
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
 
677
        self.assertEqual('file-id-2', res_inv.path2id('name2'))
591
678
 
592
679
    def test_is_root(self):
593
680
        """Ensure our root-checking code is accurate."""
1306
1393
        self.assertEqual([u'ch\xefld'],
1307
1394
                         sorted(ie_dir._children.keys()))
1308
1395
 
 
1396
    def test_filter_change_in_renamed_subfolder(self):
 
1397
        inv = Inventory('tree-root')
 
1398
        src_ie = inv.add_path('src', 'directory', 'src-id')
 
1399
        inv.add_path('src/sub/', 'directory', 'sub-id')
 
1400
        a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
 
1401
        a_ie.text_sha1 = osutils.sha_string('content\n')
 
1402
        a_ie.text_size = len('content\n')
 
1403
        chk_bytes = self.get_chk_bytes()
 
1404
        inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1405
        inv = inv.create_by_apply_delta([
 
1406
            ("src/sub/a", "src/sub/a", "a-id", a_ie),
 
1407
            ("src", "src2", "src-id", src_ie),
 
1408
            ], 'new-rev-2')
 
1409
        new_inv = inv.filter(['a-id', 'src-id'])
 
1410
        self.assertEqual([
 
1411
            ('', 'tree-root'),
 
1412
            ('src', 'src-id'),
 
1413
            ('src/sub', 'sub-id'),
 
1414
            ('src/sub/a', 'a-id'),
 
1415
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1309
1416
 
1310
1417
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1311
1418