~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

  • Committer: Ian Clatworthy
  • Date: 2009-09-09 15:30:59 UTC
  • mto: (4634.37.2 prepare-2.0)
  • mto: This revision was merged to the branch mainline in revision 4689.
  • Revision ID: ian.clatworthy@canonical.com-20090909153059-sb038agvd38ci2q8
more link fixes in the User Guide

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2012, 2016 Canonical Ltd
 
1
# Copyright (C) 2005, 2006, 2007 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
17
17
 
18
18
from bzrlib import (
19
19
    chk_map,
20
 
    groupcompress,
 
20
    bzrdir,
21
21
    errors,
22
22
    inventory,
23
23
    osutils,
24
24
    repository,
25
25
    revision,
26
 
    tests,
27
 
    workingtree,
28
 
    )
29
 
from bzrlib.inventory import (
30
 
    CHKInventory,
31
 
    Inventory,
32
 
    ROOT_ID,
33
 
    InventoryFile,
34
 
    InventoryDirectory,
35
 
    InventoryEntry,
36
 
    TreeReference,
37
 
    mutable_inventory_from_tree,
38
 
    )
 
26
    )
 
27
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
 
28
    InventoryDirectory, InventoryEntry, TreeReference)
39
29
from bzrlib.tests import (
40
30
    TestCase,
41
31
    TestCaseWithTransport,
 
32
    condition_isinstance,
 
33
    multiply_tests,
 
34
    split_suite_by_condition,
42
35
    )
43
 
from bzrlib.tests.scenarios import load_tests_apply_scenarios
44
 
 
45
 
 
46
 
load_tests = load_tests_apply_scenarios
47
 
 
48
 
 
49
 
def delta_application_scenarios():
 
36
from bzrlib.tests.per_workingtree import workingtree_formats
 
37
 
 
38
 
 
39
def load_tests(standard_tests, module, loader):
 
40
    """Parameterise some inventory tests."""
 
41
    to_adapt, result = split_suite_by_condition(standard_tests,
 
42
        condition_isinstance(TestDeltaApplication))
50
43
    scenarios = [
51
44
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
52
45
        ]
57
50
    # just creating trees.
58
51
    formats = set()
59
52
    for _, format in repository.format_registry.iteritems():
60
 
        if format.supports_full_versioned_files:
61
 
            scenarios.append((str(format.__name__), {
62
 
                'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
63
 
                'format':format}))
64
 
    for format in workingtree.format_registry._get_all():
65
 
        repo_fmt = format._matchingbzrdir.repository_format
66
 
        if not repo_fmt.supports_full_versioned_files:
67
 
            continue
 
53
        scenarios.append((str(format.__name__), {
 
54
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
55
            'format':format}))
 
56
    for format in workingtree_formats():
68
57
        scenarios.append(
69
58
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
70
59
            'apply_delta':apply_inventory_WT_basis,
73
62
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
74
63
            'apply_delta':apply_inventory_WT,
75
64
            'format':format}))
76
 
    return scenarios
77
 
 
78
 
 
79
 
def create_texts_for_inv(repo, inv):
80
 
    for path, ie in inv.iter_entries():
81
 
        if ie.text_size:
82
 
            lines = ['a' * ie.text_size]
83
 
        else:
84
 
            lines = []
85
 
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
86
 
 
87
 
 
88
 
def apply_inventory_Inventory(self, basis, delta, invalid_delta=True):
 
65
    return multiply_tests(to_adapt, scenarios, result)
 
66
 
 
67
 
 
68
def apply_inventory_Inventory(self, basis, delta):
89
69
    """Apply delta to basis and return the result.
90
 
 
 
70
    
91
71
    :param basis: An inventory to be used as the basis.
92
72
    :param delta: The inventory delta to apply:
93
73
    :return: An inventory resulting from the application.
96
76
    return basis
97
77
 
98
78
 
99
 
def apply_inventory_WT(self, basis, delta, invalid_delta=True):
 
79
def apply_inventory_WT(self, basis, delta):
100
80
    """Apply delta to basis and return the result.
101
81
 
102
82
    This sets the tree state to be basis, and then calls apply_inventory_delta.
103
 
 
 
83
    
104
84
    :param basis: An inventory to be used as the basis.
105
85
    :param delta: The inventory delta to apply:
106
86
    :return: An inventory resulting from the application.
125
105
    tree = tree.bzrdir.open_workingtree()
126
106
    tree.lock_read()
127
107
    self.addCleanup(tree.unlock)
128
 
    if not invalid_delta:
129
 
        tree._validate()
130
 
    return tree.root_inventory
131
 
 
132
 
 
133
 
def _create_repo_revisions(repo, basis, delta, invalid_delta):
134
 
    repo.start_write_group()
135
 
    try:
136
 
        rev = revision.Revision('basis', timestamp=0, timezone=None,
137
 
            message="", committer="foo@example.com")
138
 
        basis.revision_id = 'basis'
139
 
        create_texts_for_inv(repo, basis)
140
 
        repo.add_revision('basis', rev, basis)
141
 
        if invalid_delta:
142
 
            # We don't want to apply the delta to the basis, because we expect
143
 
            # the delta is invalid.
144
 
            result_inv = basis
145
 
            result_inv.revision_id = 'result'
146
 
            target_entries = None
147
 
        else:
148
 
            result_inv = basis.create_by_apply_delta(delta, 'result')
149
 
            create_texts_for_inv(repo, result_inv)
150
 
            target_entries = list(result_inv.iter_entries_by_dir())
151
 
        rev = revision.Revision('result', timestamp=0, timezone=None,
152
 
            message="", committer="foo@example.com")
153
 
        repo.add_revision('result', rev, result_inv)
154
 
        repo.commit_write_group()
155
 
    except:
156
 
        repo.abort_write_group()
157
 
        raise
158
 
    return target_entries
159
 
 
160
 
 
161
 
def _get_basis_entries(tree):
162
 
    basis_tree = tree.basis_tree()
163
 
    basis_tree.lock_read()
164
 
    basis_tree_entries = list(basis_tree.inventory.iter_entries_by_dir())
165
 
    basis_tree.unlock()
166
 
    return basis_tree_entries
167
 
 
168
 
 
169
 
def _populate_different_tree(tree, basis, delta):
170
 
    """Put all entries into tree, but at a unique location."""
171
 
    added_ids = set()
172
 
    added_paths = set()
173
 
    tree.add(['unique-dir'], ['unique-dir-id'], ['directory'])
174
 
    for path, ie in basis.iter_entries_by_dir():
175
 
        if ie.file_id in added_ids:
176
 
            continue
177
 
        # We want a unique path for each of these, we use the file-id
178
 
        tree.add(['unique-dir/' + ie.file_id], [ie.file_id], [ie.kind])
179
 
        added_ids.add(ie.file_id)
180
 
    for old_path, new_path, file_id, ie in delta:
181
 
        if file_id in added_ids:
182
 
            continue
183
 
        tree.add(['unique-dir/' + file_id], [file_id], [ie.kind])
184
 
 
185
 
 
186
 
def apply_inventory_WT_basis(test, basis, delta, invalid_delta=True):
 
108
    # One could add 'tree._validate' here but that would cause 'early' failues 
 
109
    # as far as higher level code is concerned. Possibly adding an
 
110
    # expect_fail parameter to this function and if that is False then do a
 
111
    # validate call.
 
112
    return tree.inventory
 
113
 
 
114
 
 
115
def apply_inventory_WT_basis(self, basis, delta):
187
116
    """Apply delta to basis and return the result.
188
117
 
189
118
    This sets the parent and then calls update_basis_by_delta.
191
120
    allow safety checks made by the WT to succeed, and finally ensures that all
192
121
    items in the delta with a new path are present in the WT before calling
193
122
    update_basis_by_delta.
194
 
 
 
123
    
195
124
    :param basis: An inventory to be used as the basis.
196
125
    :param delta: The inventory delta to apply:
197
126
    :return: An inventory resulting from the application.
198
127
    """
199
 
    control = test.make_bzrdir('tree', format=test.format._matchingbzrdir)
 
128
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
200
129
    control.create_repository()
201
130
    control.create_branch()
202
 
    tree = test.format.initialize(control)
 
131
    tree = self.format.initialize(control)
203
132
    tree.lock_write()
204
133
    try:
205
 
        target_entries = _create_repo_revisions(tree.branch.repository, basis,
206
 
                                                delta, invalid_delta)
 
134
        repo = tree.branch.repository
 
135
        repo.start_write_group()
 
136
        try:
 
137
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
138
                message="", committer="foo@example.com")
 
139
            basis.revision_id = 'basis'
 
140
            repo.add_revision('basis', rev, basis)
 
141
            # Add a revision for the result, with the basis content - 
 
142
            # update_basis_by_delta doesn't check that the delta results in
 
143
            # result, and we want inconsistent deltas to get called on the
 
144
            # tree, or else the code isn't actually checked.
 
145
            rev = revision.Revision('result', timestamp=0, timezone=None,
 
146
                message="", committer="foo@example.com")
 
147
            basis.revision_id = 'result'
 
148
            repo.add_revision('result', rev, basis)
 
149
        except:
 
150
            repo.abort_write_group()
 
151
            raise
 
152
        else:
 
153
            repo.commit_write_group()
207
154
        # Set the basis state as the trees current state
208
155
        tree._write_inventory(basis)
209
156
        # This reads basis from the repo and puts it into the tree's local
210
157
        # cache, if it has one.
211
158
        tree.set_parent_ids(['basis'])
 
159
        paths = {}
 
160
        parents = set()
 
161
        for old, new, id, entry in delta:
 
162
            if None in (new, entry):
 
163
                continue
 
164
            paths[new] = (entry.file_id, entry.kind)
 
165
            parents.add(osutils.dirname(new))
 
166
        parents = osutils.minimum_path_selection(parents)
 
167
        parents.discard('')
 
168
        # Put place holders in the tree to permit adding the other entries.
 
169
        for pos, parent in enumerate(parents):
 
170
            if not tree.path2id(parent):
 
171
                # add a synthetic directory in the tree so we can can put the
 
172
                # tree0 entries in place for dirstate.
 
173
                tree.add([parent], ["id%d" % pos], ["directory"])
 
174
        if paths:
 
175
            # Many deltas may cause this mini-apply to fail, but we want to see what
 
176
            # the delta application code says, not the prep that we do to deal with 
 
177
            # limitations of dirstate's update_basis code.
 
178
            for path, (file_id, kind) in sorted(paths.items()):
 
179
                try:
 
180
                    tree.add([path], [file_id], [kind])
 
181
                except (KeyboardInterrupt, SystemExit):
 
182
                    raise
 
183
                except:
 
184
                    pass
212
185
    finally:
213
186
        tree.unlock()
214
187
    # Fresh lock, reads disk again.
215
188
    tree.lock_write()
216
189
    try:
217
190
        tree.update_basis_by_delta('result', delta)
218
 
        if not invalid_delta:
219
 
            tree._validate()
220
191
    finally:
221
192
        tree.unlock()
222
193
    # reload tree - ensure we get what was written.
223
194
    tree = tree.bzrdir.open_workingtree()
224
195
    basis_tree = tree.basis_tree()
225
196
    basis_tree.lock_read()
226
 
    test.addCleanup(basis_tree.unlock)
227
 
    basis_inv = basis_tree.root_inventory
228
 
    if target_entries:
229
 
        basis_entries = list(basis_inv.iter_entries_by_dir())
230
 
        test.assertEqual(target_entries, basis_entries)
231
 
    return basis_inv
232
 
 
233
 
 
234
 
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta,
235
 
                                                      invalid_delta=True):
 
197
    self.addCleanup(basis_tree.unlock)
 
198
    # Note, that if the tree does not have a local cache, the trick above of
 
199
    # setting the result as the basis, will come back to bite us. That said,
 
200
    # all the implementations in bzr do have a local cache.
 
201
    return basis_tree.inventory
 
202
 
 
203
 
 
204
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
236
205
    """Apply delta to basis and return the result.
237
206
    
238
207
    This inserts basis as a whole inventory and then uses
252
221
            rev = revision.Revision('basis', timestamp=0, timezone=None,
253
222
                message="", committer="foo@example.com")
254
223
            basis.revision_id = 'basis'
255
 
            create_texts_for_inv(repo, basis)
256
224
            repo.add_revision('basis', rev, basis)
257
 
            repo.commit_write_group()
258
225
        except:
259
226
            repo.abort_write_group()
260
227
            raise
 
228
        else:
 
229
            repo.commit_write_group()
261
230
    finally:
262
231
        repo.unlock()
263
232
    repo.lock_write()
280
249
    return repo.get_inventory('result')
281
250
 
282
251
 
283
 
class TestInventoryUpdates(TestCase):
284
 
 
285
 
    def test_creation_from_root_id(self):
286
 
        # iff a root id is passed to the constructor, a root directory is made
287
 
        inv = inventory.Inventory(root_id='tree-root')
288
 
        self.assertNotEqual(None, inv.root)
289
 
        self.assertEqual('tree-root', inv.root.file_id)
290
 
 
291
 
    def test_add_path_of_root(self):
292
 
        # if no root id is given at creation time, there is no root directory
293
 
        inv = inventory.Inventory(root_id=None)
294
 
        self.assertIs(None, inv.root)
295
 
        # add a root entry by adding its path
296
 
        ie = inv.add_path("", "directory", "my-root")
297
 
        ie.revision = 'test-rev'
298
 
        self.assertEqual("my-root", ie.file_id)
299
 
        self.assertIs(ie, inv.root)
300
 
 
301
 
    def test_add_path(self):
302
 
        inv = inventory.Inventory(root_id='tree_root')
303
 
        ie = inv.add_path('hello', 'file', 'hello-id')
304
 
        self.assertEqual('hello-id', ie.file_id)
305
 
        self.assertEqual('file', ie.kind)
306
 
 
307
 
    def test_copy(self):
308
 
        """Make sure copy() works and creates a deep copy."""
309
 
        inv = inventory.Inventory(root_id='some-tree-root')
310
 
        ie = inv.add_path('hello', 'file', 'hello-id')
311
 
        inv2 = inv.copy()
312
 
        inv.root.file_id = 'some-new-root'
313
 
        ie.name = 'file2'
314
 
        self.assertEqual('some-tree-root', inv2.root.file_id)
315
 
        self.assertEqual('hello', inv2['hello-id'].name)
316
 
 
317
 
    def test_copy_empty(self):
318
 
        """Make sure an empty inventory can be copied."""
319
 
        inv = inventory.Inventory(root_id=None)
320
 
        inv2 = inv.copy()
321
 
        self.assertIs(None, inv2.root)
322
 
 
323
 
    def test_copy_copies_root_revision(self):
324
 
        """Make sure the revision of the root gets copied."""
325
 
        inv = inventory.Inventory(root_id='someroot')
326
 
        inv.root.revision = 'therev'
327
 
        inv2 = inv.copy()
328
 
        self.assertEqual('someroot', inv2.root.file_id)
329
 
        self.assertEqual('therev', inv2.root.revision)
330
 
 
331
 
    def test_create_tree_reference(self):
332
 
        inv = inventory.Inventory('tree-root-123')
333
 
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
334
 
                              revision='rev', reference_revision='rev2'))
335
 
 
336
 
    def test_error_encoding(self):
337
 
        inv = inventory.Inventory('tree-root')
338
 
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
339
 
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
340
 
            InventoryFile('b-id', u'\u1234', 'tree-root'))
341
 
        self.assertContainsRe(str(e), r'\\u1234')
342
 
 
343
 
    def test_add_recursive(self):
344
 
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
345
 
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
346
 
        parent.children[child.file_id] = child
347
 
        inv = inventory.Inventory('tree-root')
348
 
        inv.add(parent)
349
 
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
350
 
 
351
 
 
352
 
 
353
252
class TestDeltaApplication(TestCaseWithTransport):
354
 
 
355
 
    scenarios = delta_application_scenarios()
356
253
 
357
254
    def get_empty_inventory(self, reference_inv=None):
358
255
        """Get an empty inventory.
373
270
            inv.root.revision = 'basis'
374
271
        return inv
375
272
 
376
 
    def make_file_ie(self, file_id='file-id', name='name', parent_id=None):
377
 
        ie_file = inventory.InventoryFile(file_id, name, parent_id)
378
 
        ie_file.revision = 'result'
379
 
        ie_file.text_size = 0
380
 
        ie_file.text_sha1 = ''
381
 
        return ie_file
382
 
 
383
273
    def test_empty_delta(self):
384
274
        inv = self.get_empty_inventory()
385
275
        delta = []
409
299
        file1.revision = 'result'
410
300
        file1.text_size = 0
411
301
        file1.text_sha1 = ""
412
 
        file2 = file1.copy()
413
 
        file2.name = 'path2'
 
302
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
 
303
        file2.revision = 'result'
 
304
        file2.text_size = 0
 
305
        file2.text_sha1 = ""
414
306
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
415
307
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
416
308
            inv, delta)
421
313
        file1.revision = 'result'
422
314
        file1.text_size = 0
423
315
        file1.text_sha1 = ""
424
 
        file2 = file1.copy()
425
 
        file2.file_id = 'id2'
 
316
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
 
317
        file2.revision = 'result'
 
318
        file2.text_size = 0
 
319
        file2.text_sha1 = ""
426
320
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
427
321
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
428
322
            inv, delta)
600
494
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
601
495
            inv, delta)
602
496
 
603
 
    def test_add_file(self):
604
 
        inv = self.get_empty_inventory()
605
 
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
606
 
        file1.revision = 'result'
607
 
        file1.text_size = 0
608
 
        file1.text_sha1 = ''
609
 
        delta = [(None, u'path', 'file-id', file1)]
610
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
611
 
        self.assertEqual('file-id', res_inv['file-id'].file_id)
612
 
 
613
 
    def test_remove_file(self):
614
 
        inv = self.get_empty_inventory()
615
 
        file1 = inventory.InventoryFile('file-id', 'path', inv.root.file_id)
616
 
        file1.revision = 'result'
617
 
        file1.text_size = 0
618
 
        file1.text_sha1 = ''
619
 
        inv.add(file1)
620
 
        delta = [(u'path', None, 'file-id', None)]
621
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
622
 
        self.assertEqual(None, res_inv.path2id('path'))
623
 
        self.assertRaises(errors.NoSuchId, res_inv.id2path, 'file-id')
624
 
 
625
 
    def test_rename_file(self):
626
 
        inv = self.get_empty_inventory()
627
 
        file1 = self.make_file_ie(name='path', parent_id=inv.root.file_id)
628
 
        inv.add(file1)
629
 
        file2 = self.make_file_ie(name='path2', parent_id=inv.root.file_id)
630
 
        delta = [(u'path', 'path2', 'file-id', file2)]
631
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
632
 
        self.assertEqual(None, res_inv.path2id('path'))
633
 
        self.assertEqual('file-id', res_inv.path2id('path2'))
634
 
 
635
 
    def test_replaced_at_new_path(self):
636
 
        inv = self.get_empty_inventory()
637
 
        file1 = self.make_file_ie(file_id='id1', parent_id=inv.root.file_id)
638
 
        inv.add(file1)
639
 
        file2 = self.make_file_ie(file_id='id2', parent_id=inv.root.file_id)
640
 
        delta = [(u'name', None, 'id1', None),
641
 
                 (None, u'name', 'id2', file2)]
642
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
643
 
        self.assertEqual('id2', res_inv.path2id('name'))
644
 
 
645
 
    def test_rename_dir(self):
646
 
        inv = self.get_empty_inventory()
647
 
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
648
 
        dir1.revision = 'basis'
649
 
        file1 = self.make_file_ie(parent_id='dir-id')
650
 
        inv.add(dir1)
651
 
        inv.add(file1)
652
 
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
653
 
        dir2.revision = 'result'
654
 
        delta = [('dir1', 'dir2', 'dir-id', dir2)]
655
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
656
 
        # The file should be accessible under the new path
657
 
        self.assertEqual('file-id', res_inv.path2id('dir2/name'))
658
 
 
659
 
    def test_renamed_dir_with_renamed_child(self):
660
 
        inv = self.get_empty_inventory()
661
 
        dir1 = inventory.InventoryDirectory('dir-id', 'dir1', inv.root.file_id)
662
 
        dir1.revision = 'basis'
663
 
        file1 = self.make_file_ie('file-id-1', 'name1', parent_id='dir-id')
664
 
        file2 = self.make_file_ie('file-id-2', 'name2', parent_id='dir-id')
665
 
        inv.add(dir1)
666
 
        inv.add(file1)
667
 
        inv.add(file2)
668
 
        dir2 = inventory.InventoryDirectory('dir-id', 'dir2', inv.root.file_id)
669
 
        dir2.revision = 'result'
670
 
        file2b = self.make_file_ie('file-id-2', 'name2', inv.root.file_id)
671
 
        delta = [('dir1', 'dir2', 'dir-id', dir2),
672
 
                 ('dir1/name2', 'name2', 'file-id-2', file2b)]
673
 
        res_inv = self.apply_delta(self, inv, delta, invalid_delta=False)
674
 
        # The file should be accessible under the new path
675
 
        self.assertEqual('file-id-1', res_inv.path2id('dir2/name1'))
676
 
        self.assertEqual(None, res_inv.path2id('dir2/name2'))
677
 
        self.assertEqual('file-id-2', res_inv.path2id('name2'))
678
 
 
679
 
    def test_is_root(self):
680
 
        """Ensure our root-checking code is accurate."""
681
 
        inv = inventory.Inventory('TREE_ROOT')
682
 
        self.assertTrue(inv.is_root('TREE_ROOT'))
683
 
        self.assertFalse(inv.is_root('booga'))
684
 
        inv.root.file_id = 'booga'
685
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
686
 
        self.assertTrue(inv.is_root('booga'))
687
 
        # works properly even if no root is set
688
 
        inv.root = None
689
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
690
 
        self.assertFalse(inv.is_root('booga'))
691
 
 
692
 
    def test_entries_for_empty_inventory(self):
693
 
        """Test that entries() will not fail for an empty inventory"""
694
 
        inv = Inventory(root_id=None)
695
 
        self.assertEqual([], inv.entries())
696
 
 
697
497
 
698
498
class TestInventoryEntry(TestCase):
699
499
 
711
511
 
712
512
    def test_dir_detect_changes(self):
713
513
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
514
        left.text_sha1 = 123
 
515
        left.executable = True
 
516
        left.symlink_target='foo'
714
517
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
518
        right.text_sha1 = 321
 
519
        right.symlink_target='bar'
715
520
        self.assertEqual((False, False), left.detect_changes(right))
716
521
        self.assertEqual((False, False), right.detect_changes(left))
717
522
 
731
536
 
732
537
    def test_symlink_detect_changes(self):
733
538
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
539
        left.text_sha1 = 123
 
540
        left.executable = True
734
541
        left.symlink_target='foo'
735
542
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
543
        right.text_sha1 = 321
736
544
        right.symlink_target='foo'
737
545
        self.assertEqual((False, False), left.detect_changes(right))
738
546
        self.assertEqual((False, False), right.detect_changes(left))
742
550
 
743
551
    def test_file_has_text(self):
744
552
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
745
 
        self.assertTrue(file.has_text())
 
553
        self.failUnless(file.has_text())
746
554
 
747
555
    def test_directory_has_text(self):
748
556
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
749
 
        self.assertFalse(dir.has_text())
 
557
        self.failIf(dir.has_text())
750
558
 
751
559
    def test_link_has_text(self):
752
560
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
753
 
        self.assertFalse(link.has_text())
 
561
        self.failIf(link.has_text())
754
562
 
755
563
    def test_make_entry(self):
756
564
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
834
642
        self.assertEqual(expected_change, change)
835
643
 
836
644
 
837
 
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
645
class TestCHKInventory(TestCaseWithTransport):
838
646
 
839
647
    def get_chk_bytes(self):
840
 
        factory = groupcompress.make_pack_factory(True, True, 1)
841
 
        trans = self.get_transport('')
842
 
        return factory(trans)
 
648
        # The easiest way to get a CHK store is a development6 repository and
 
649
        # then work with the chk_bytes attribute directly.
 
650
        repo = self.make_repository(".", format="development6-rich-root")
 
651
        repo.lock_write()
 
652
        self.addCleanup(repo.unlock)
 
653
        repo.start_write_group()
 
654
        self.addCleanup(repo.abort_write_group)
 
655
        return repo.chk_bytes
843
656
 
844
657
    def read_bytes(self, chk_bytes, key):
845
658
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
1101
914
        delta = [("", None, base_inv.root.file_id, None),
1102
915
            (None, "",  "myrootid", inv.root)]
1103
916
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1104
 
        self.assertEqual(reference_inv.root, new_inv.root)
 
917
        self.assertEquals(reference_inv.root, new_inv.root)
1105
918
 
1106
919
    def test_create_by_apply_delta_empty_add_child(self):
1107
920
        inv = Inventory()
1310
1123
        self.assertIsInstance(ie2.name, unicode)
1311
1124
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1312
1125
                         inv._bytes_to_utf8name_key(bytes))
1313
 
 
1314
 
    def make_basic_utf8_inventory(self):
1315
 
        inv = Inventory()
1316
 
        inv.revision_id = "revid"
1317
 
        inv.root.revision = "rootrev"
1318
 
        root_id = inv.root.file_id
1319
 
        inv.add(InventoryFile("fileid", u'f\xefle', root_id))
1320
 
        inv["fileid"].revision = "filerev"
1321
 
        inv["fileid"].text_sha1 = "ffff"
1322
 
        inv["fileid"].text_size = 0
1323
 
        inv.add(InventoryDirectory("dirid", u'dir-\N{EURO SIGN}', root_id))
1324
 
        inv.add(InventoryFile("childid", u'ch\xefld', "dirid"))
1325
 
        inv["childid"].revision = "filerev"
1326
 
        inv["childid"].text_sha1 = "ffff"
1327
 
        inv["childid"].text_size = 0
1328
 
        chk_bytes = self.get_chk_bytes()
1329
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1330
 
        bytes = ''.join(chk_inv.to_lines())
1331
 
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1332
 
 
1333
 
    def test__preload_handles_utf8(self):
1334
 
        new_inv = self.make_basic_utf8_inventory()
1335
 
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
1336
 
        self.assertFalse(new_inv._fully_cached)
1337
 
        new_inv._preload_cache()
1338
 
        self.assertEqual(
1339
 
            sorted([new_inv.root_id, "fileid", "dirid", "childid"]),
1340
 
            sorted(new_inv._fileid_to_entry_cache.keys()))
1341
 
        ie_root = new_inv._fileid_to_entry_cache[new_inv.root_id]
1342
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1343
 
                         sorted(ie_root._children.keys()))
1344
 
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
1345
 
        self.assertEqual([u'ch\xefld'], sorted(ie_dir._children.keys()))
1346
 
 
1347
 
    def test__preload_populates_cache(self):
1348
 
        inv = Inventory()
1349
 
        inv.revision_id = "revid"
1350
 
        inv.root.revision = "rootrev"
1351
 
        root_id = inv.root.file_id
1352
 
        inv.add(InventoryFile("fileid", "file", root_id))
1353
 
        inv["fileid"].revision = "filerev"
1354
 
        inv["fileid"].executable = True
1355
 
        inv["fileid"].text_sha1 = "ffff"
1356
 
        inv["fileid"].text_size = 1
1357
 
        inv.add(InventoryDirectory("dirid", "dir", root_id))
1358
 
        inv.add(InventoryFile("childid", "child", "dirid"))
1359
 
        inv["childid"].revision = "filerev"
1360
 
        inv["childid"].executable = False
1361
 
        inv["childid"].text_sha1 = "dddd"
1362
 
        inv["childid"].text_size = 1
1363
 
        chk_bytes = self.get_chk_bytes()
1364
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1365
 
        bytes = ''.join(chk_inv.to_lines())
1366
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1367
 
        self.assertEqual({}, new_inv._fileid_to_entry_cache)
1368
 
        self.assertFalse(new_inv._fully_cached)
1369
 
        new_inv._preload_cache()
1370
 
        self.assertEqual(
1371
 
            sorted([root_id, "fileid", "dirid", "childid"]),
1372
 
            sorted(new_inv._fileid_to_entry_cache.keys()))
1373
 
        self.assertTrue(new_inv._fully_cached)
1374
 
        ie_root = new_inv._fileid_to_entry_cache[root_id]
1375
 
        self.assertEqual(['dir', 'file'], sorted(ie_root._children.keys()))
1376
 
        ie_dir = new_inv._fileid_to_entry_cache['dirid']
1377
 
        self.assertEqual(['child'], sorted(ie_dir._children.keys()))
1378
 
 
1379
 
    def test__preload_handles_partially_evaluated_inventory(self):
1380
 
        new_inv = self.make_basic_utf8_inventory()
1381
 
        ie = new_inv[new_inv.root_id]
1382
 
        self.assertIs(None, ie._children)
1383
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1384
 
                         sorted(ie.children.keys()))
1385
 
        # Accessing .children loads _children
1386
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1387
 
                         sorted(ie._children.keys()))
1388
 
        new_inv._preload_cache()
1389
 
        # No change
1390
 
        self.assertEqual([u'dir-\N{EURO SIGN}', u'f\xefle'],
1391
 
                         sorted(ie._children.keys()))
1392
 
        ie_dir = new_inv["dirid"]
1393
 
        self.assertEqual([u'ch\xefld'],
1394
 
                         sorted(ie_dir._children.keys()))
1395
 
 
1396
 
    def test_filter_change_in_renamed_subfolder(self):
1397
 
        inv = Inventory('tree-root')
1398
 
        src_ie = inv.add_path('src', 'directory', 'src-id')
1399
 
        inv.add_path('src/sub/', 'directory', 'sub-id')
1400
 
        a_ie = inv.add_path('src/sub/a', 'file', 'a-id')
1401
 
        a_ie.text_sha1 = osutils.sha_string('content\n')
1402
 
        a_ie.text_size = len('content\n')
1403
 
        chk_bytes = self.get_chk_bytes()
1404
 
        inv = CHKInventory.from_inventory(chk_bytes, inv)
1405
 
        inv = inv.create_by_apply_delta([
1406
 
            ("src/sub/a", "src/sub/a", "a-id", a_ie),
1407
 
            ("src", "src2", "src-id", src_ie),
1408
 
            ], 'new-rev-2')
1409
 
        new_inv = inv.filter(['a-id', 'src-id'])
1410
 
        self.assertEqual([
1411
 
            ('', 'tree-root'),
1412
 
            ('src', 'src-id'),
1413
 
            ('src/sub', 'sub-id'),
1414
 
            ('src/sub/a', 'a-id'),
1415
 
            ], [(path, ie.file_id) for path, ie in new_inv.iter_entries()])
1416
 
 
1417
 
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1418
 
 
1419
 
    def get_chk_bytes(self):
1420
 
        factory = groupcompress.make_pack_factory(True, True, 1)
1421
 
        trans = self.get_transport('')
1422
 
        return factory(trans)
1423
 
 
1424
 
    def make_dir(self, inv, name, parent_id):
1425
 
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1426
 
 
1427
 
    def make_file(self, inv, name, parent_id, content='content\n'):
1428
 
        ie = inv.make_entry('file', name, parent_id, name + '-id')
1429
 
        ie.text_sha1 = osutils.sha_string(content)
1430
 
        ie.text_size = len(content)
1431
 
        inv.add(ie)
1432
 
 
1433
 
    def make_simple_inventory(self):
1434
 
        inv = Inventory('TREE_ROOT')
1435
 
        inv.revision_id = "revid"
1436
 
        inv.root.revision = "rootrev"
1437
 
        # /                 TREE_ROOT
1438
 
        # dir1/             dir1-id
1439
 
        #   sub-file1       sub-file1-id
1440
 
        #   sub-file2       sub-file2-id
1441
 
        #   sub-dir1/       sub-dir1-id
1442
 
        #     subsub-file1  subsub-file1-id
1443
 
        # dir2/             dir2-id
1444
 
        #   sub2-file1      sub2-file1-id
1445
 
        # top               top-id
1446
 
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
1447
 
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
1448
 
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
1449
 
        self.make_file(inv, 'top', 'TREE_ROOT')
1450
 
        self.make_file(inv, 'sub-file1', 'dir1-id')
1451
 
        self.make_file(inv, 'sub-file2', 'dir1-id')
1452
 
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1453
 
        self.make_file(inv, 'sub2-file1', 'dir2-id')
1454
 
        chk_bytes = self.get_chk_bytes()
1455
 
        #  use a small maximum_size to force internal paging structures
1456
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1457
 
                        maximum_size=100,
1458
 
                        search_key_name='hash-255-way')
1459
 
        bytes = ''.join(chk_inv.to_lines())
1460
 
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1461
 
 
1462
 
    def assert_Getitems(self, expected_fileids, inv, file_ids):
1463
 
        self.assertEqual(sorted(expected_fileids),
1464
 
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1465
 
 
1466
 
    def assertExpand(self, all_ids, inv, file_ids):
1467
 
        (val_all_ids,
1468
 
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1469
 
        self.assertEqual(set(all_ids), val_all_ids)
1470
 
        entries = inv._getitems(val_all_ids)
1471
 
        expected_children = {}
1472
 
        for entry in entries:
1473
 
            s = expected_children.setdefault(entry.parent_id, [])
1474
 
            s.append(entry.file_id)
1475
 
        val_children = dict((k, sorted(v)) for k, v
1476
 
                            in val_children.iteritems())
1477
 
        expected_children = dict((k, sorted(v)) for k, v
1478
 
                            in expected_children.iteritems())
1479
 
        self.assertEqual(expected_children, val_children)
1480
 
 
1481
 
    def test_make_simple_inventory(self):
1482
 
        inv = self.make_simple_inventory()
1483
 
        layout = []
1484
 
        for path, entry in inv.iter_entries_by_dir():
1485
 
            layout.append((path, entry.file_id))
1486
 
        self.assertEqual([
1487
 
            ('', 'TREE_ROOT'),
1488
 
            ('dir1', 'dir1-id'),
1489
 
            ('dir2', 'dir2-id'),
1490
 
            ('top', 'top-id'),
1491
 
            ('dir1/sub-dir1', 'sub-dir1-id'),
1492
 
            ('dir1/sub-file1', 'sub-file1-id'),
1493
 
            ('dir1/sub-file2', 'sub-file2-id'),
1494
 
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1495
 
            ('dir2/sub2-file1', 'sub2-file1-id'),
1496
 
            ], layout)
1497
 
 
1498
 
    def test__getitems(self):
1499
 
        inv = self.make_simple_inventory()
1500
 
        # Reading from disk
1501
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1502
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1503
 
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1504
 
        # From cache
1505
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1506
 
        # Mixed
1507
 
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1508
 
                             ['dir1-id', 'sub-file2-id'])
1509
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1510
 
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1511
 
 
1512
 
    def test_single_file(self):
1513
 
        inv = self.make_simple_inventory()
1514
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1515
 
 
1516
 
    def test_get_all_parents(self):
1517
 
        inv = self.make_simple_inventory()
1518
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1519
 
                           'subsub-file1-id',
1520
 
                          ], inv, ['subsub-file1-id'])
1521
 
 
1522
 
    def test_get_children(self):
1523
 
        inv = self.make_simple_inventory()
1524
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1525
 
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1526
 
                          ], inv, ['dir1-id'])
1527
 
 
1528
 
    def test_from_root(self):
1529
 
        inv = self.make_simple_inventory()
1530
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1531
 
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1532
 
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1533
 
 
1534
 
    def test_top_level_file(self):
1535
 
        inv = self.make_simple_inventory()
1536
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1537
 
 
1538
 
    def test_subsub_file(self):
1539
 
        inv = self.make_simple_inventory()
1540
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1541
 
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
1542
 
 
1543
 
    def test_sub_and_root(self):
1544
 
        inv = self.make_simple_inventory()
1545
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1546
 
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
1547
 
 
1548
 
 
1549
 
class TestMutableInventoryFromTree(TestCaseWithTransport):
1550
 
 
1551
 
    def test_empty(self):
1552
 
        repository = self.make_repository('.')
1553
 
        tree = repository.revision_tree(revision.NULL_REVISION)
1554
 
        inv = mutable_inventory_from_tree(tree)
1555
 
        self.assertEqual(revision.NULL_REVISION, inv.revision_id)
1556
 
        self.assertEqual(0, len(inv))
1557
 
 
1558
 
    def test_some_files(self):
1559
 
        wt = self.make_branch_and_tree('.')
1560
 
        self.build_tree(['a'])
1561
 
        wt.add(['a'], ['thefileid'])
1562
 
        revid = wt.commit("commit")
1563
 
        tree = wt.branch.repository.revision_tree(revid)
1564
 
        inv = mutable_inventory_from_tree(tree)
1565
 
        self.assertEqual(revid, inv.revision_id)
1566
 
        self.assertEqual(2, len(inv))
1567
 
        self.assertEqual("a", inv['thefileid'].name)
1568
 
        # The inventory should be mutable and independent of
1569
 
        # the original tree
1570
 
        self.assertFalse(tree.root_inventory['thefileid'].executable)
1571
 
        inv['thefileid'].executable = True
1572
 
        self.assertFalse(tree.root_inventory['thefileid'].executable)