~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

  • Committer: Jelmer Vernooij
  • Date: 2010-09-01 09:26:50 UTC
  • mto: This revision was merged to the branch mainline in revision 5414.
  • Revision ID: jelmer@samba.org-20100901092650-o0t1vgfiu0h6rzhh
Use cvar.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
from bzrlib.selftest import TestBase
18
 
 
19
 
from bzrlib.inventory import Inventory, InventoryEntry
20
 
 
21
 
 
22
 
class TestIsWithin(TestBase):
23
 
    def runTest(self):
24
 
        from bzrlib.osutils import is_inside_any
25
 
        
26
 
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
27
 
                         (['src'], 'src/foo.c'),
28
 
                         (['src'], 'src'),
29
 
                         ]:
30
 
            self.assert_(is_inside_any(dirs, fn))
31
 
            
32
 
        for dirs, fn in [(['src'], 'srccontrol'),
33
 
                         (['src'], 'srccontrol/foo')]:
34
 
            self.assertFalse(is_inside_any(dirs, fn))
35
 
            
36
 
            
37
 
            
38
 
class TestInventoryIds(TestBase):
39
 
    def runTest(self):
40
 
        """Test detection of files within selected directories."""
41
 
        inv = Inventory()
42
 
        
43
 
        for args in [('src', 'directory', 'src-id'), 
44
 
                     ('doc', 'directory', 'doc-id'), 
45
 
                     ('src/hello.c', 'file'),
46
 
                     ('src/bye.c', 'file', 'bye-id'),
47
 
                     ('Makefile', 'file')]:
48
 
            inv.add_path(*args)
49
 
            
50
 
        self.assertEqual(inv.path2id('src'), 'src-id')
51
 
        self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
52
 
        
53
 
        self.assert_('src-id' in inv)
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from bzrlib import (
 
19
    chk_map,
 
20
    groupcompress,
 
21
    errors,
 
22
    inventory,
 
23
    osutils,
 
24
    repository,
 
25
    revision,
 
26
    tests,
 
27
    )
 
28
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
 
29
    InventoryDirectory, InventoryEntry, TreeReference)
 
30
from bzrlib.tests import (
 
31
    TestCase,
 
32
    TestCaseWithTransport,
 
33
    condition_isinstance,
 
34
    multiply_tests,
 
35
    split_suite_by_condition,
 
36
    )
 
37
from bzrlib.tests.per_workingtree import workingtree_formats
 
38
 
 
39
 
 
40
def load_tests(standard_tests, module, loader):
 
41
    """Parameterise some inventory tests."""
 
42
    to_adapt, result = split_suite_by_condition(standard_tests,
 
43
        condition_isinstance(TestDeltaApplication))
 
44
    scenarios = [
 
45
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
46
        ]
 
47
    # Working tree basis delta application
 
48
    # Repository add_inv_by_delta.
 
49
    # Reduce form of the per_repository test logic - that logic needs to be
 
50
    # be able to get /just/ repositories whereas these tests are fine with
 
51
    # just creating trees.
 
52
    formats = set()
 
53
    for _, format in repository.format_registry.iteritems():
 
54
        scenarios.append((str(format.__name__), {
 
55
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
56
            'format':format}))
 
57
    for format in workingtree_formats():
 
58
        scenarios.append(
 
59
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
60
            'apply_delta':apply_inventory_WT_basis,
 
61
            'format':format}))
 
62
        scenarios.append(
 
63
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
64
            'apply_delta':apply_inventory_WT,
 
65
            'format':format}))
 
66
    return multiply_tests(to_adapt, scenarios, result)
 
67
 
 
68
 
 
69
def create_texts_for_inv(repo, inv):
 
70
    for path, ie in inv.iter_entries():
 
71
        if ie.text_size:
 
72
            lines = ['a' * ie.text_size]
 
73
        else:
 
74
            lines = []
 
75
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
76
    
 
77
def apply_inventory_Inventory(self, basis, delta):
 
78
    """Apply delta to basis and return the result.
 
79
    
 
80
    :param basis: An inventory to be used as the basis.
 
81
    :param delta: The inventory delta to apply:
 
82
    :return: An inventory resulting from the application.
 
83
    """
 
84
    basis.apply_delta(delta)
 
85
    return basis
 
86
 
 
87
 
 
88
def apply_inventory_WT(self, basis, delta):
 
89
    """Apply delta to basis and return the result.
 
90
 
 
91
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
92
    
 
93
    :param basis: An inventory to be used as the basis.
 
94
    :param delta: The inventory delta to apply:
 
95
    :return: An inventory resulting from the application.
 
96
    """
 
97
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
98
    control.create_repository()
 
99
    control.create_branch()
 
100
    tree = self.format.initialize(control)
 
101
    tree.lock_write()
 
102
    try:
 
103
        tree._write_inventory(basis)
 
104
    finally:
 
105
        tree.unlock()
 
106
    # Fresh object, reads disk again.
 
107
    tree = tree.bzrdir.open_workingtree()
 
108
    tree.lock_write()
 
109
    try:
 
110
        tree.apply_inventory_delta(delta)
 
111
    finally:
 
112
        tree.unlock()
 
113
    # reload tree - ensure we get what was written.
 
114
    tree = tree.bzrdir.open_workingtree()
 
115
    tree.lock_read()
 
116
    self.addCleanup(tree.unlock)
 
117
    # One could add 'tree._validate' here but that would cause 'early' failues 
 
118
    # as far as higher level code is concerned. Possibly adding an
 
119
    # expect_fail parameter to this function and if that is False then do a
 
120
    # validate call.
 
121
    return tree.inventory
 
122
 
 
123
 
 
124
def apply_inventory_WT_basis(self, basis, delta):
 
125
    """Apply delta to basis and return the result.
 
126
 
 
127
    This sets the parent and then calls update_basis_by_delta.
 
128
    It also puts the basis in the repository under both 'basis' and 'result' to
 
129
    allow safety checks made by the WT to succeed, and finally ensures that all
 
130
    items in the delta with a new path are present in the WT before calling
 
131
    update_basis_by_delta.
 
132
    
 
133
    :param basis: An inventory to be used as the basis.
 
134
    :param delta: The inventory delta to apply:
 
135
    :return: An inventory resulting from the application.
 
136
    """
 
137
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
138
    control.create_repository()
 
139
    control.create_branch()
 
140
    tree = self.format.initialize(control)
 
141
    tree.lock_write()
 
142
    try:
 
143
        repo = tree.branch.repository
 
144
        repo.start_write_group()
 
145
        try:
 
146
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
147
                message="", committer="foo@example.com")
 
148
            basis.revision_id = 'basis'
 
149
            create_texts_for_inv(tree.branch.repository, basis)
 
150
            repo.add_revision('basis', rev, basis)
 
151
            # Add a revision for the result, with the basis content - 
 
152
            # update_basis_by_delta doesn't check that the delta results in
 
153
            # result, and we want inconsistent deltas to get called on the
 
154
            # tree, or else the code isn't actually checked.
 
155
            rev = revision.Revision('result', timestamp=0, timezone=None,
 
156
                message="", committer="foo@example.com")
 
157
            basis.revision_id = 'result'
 
158
            repo.add_revision('result', rev, basis)
 
159
            repo.commit_write_group()
 
160
        except:
 
161
            repo.abort_write_group()
 
162
            raise
 
163
        # Set the basis state as the trees current state
 
164
        tree._write_inventory(basis)
 
165
        # This reads basis from the repo and puts it into the tree's local
 
166
        # cache, if it has one.
 
167
        tree.set_parent_ids(['basis'])
 
168
        paths = {}
 
169
        parents = set()
 
170
        for old, new, id, entry in delta:
 
171
            if None in (new, entry):
 
172
                continue
 
173
            paths[new] = (entry.file_id, entry.kind)
 
174
            parents.add(osutils.dirname(new))
 
175
        parents = osutils.minimum_path_selection(parents)
 
176
        parents.discard('')
 
177
        # Put place holders in the tree to permit adding the other entries.
 
178
        for pos, parent in enumerate(parents):
 
179
            if not tree.path2id(parent):
 
180
                # add a synthetic directory in the tree so we can can put the
 
181
                # tree0 entries in place for dirstate.
 
182
                tree.add([parent], ["id%d" % pos], ["directory"])
 
183
        if paths:
 
184
            # Many deltas may cause this mini-apply to fail, but we want to see what
 
185
            # the delta application code says, not the prep that we do to deal with 
 
186
            # limitations of dirstate's update_basis code.
 
187
            for path, (file_id, kind) in sorted(paths.items()):
 
188
                try:
 
189
                    tree.add([path], [file_id], [kind])
 
190
                except (KeyboardInterrupt, SystemExit):
 
191
                    raise
 
192
                except:
 
193
                    pass
 
194
    finally:
 
195
        tree.unlock()
 
196
    # Fresh lock, reads disk again.
 
197
    tree.lock_write()
 
198
    try:
 
199
        tree.update_basis_by_delta('result', delta)
 
200
    finally:
 
201
        tree.unlock()
 
202
    # reload tree - ensure we get what was written.
 
203
    tree = tree.bzrdir.open_workingtree()
 
204
    basis_tree = tree.basis_tree()
 
205
    basis_tree.lock_read()
 
206
    self.addCleanup(basis_tree.unlock)
 
207
    # Note, that if the tree does not have a local cache, the trick above of
 
208
    # setting the result as the basis, will come back to bite us. That said,
 
209
    # all the implementations in bzr do have a local cache.
 
210
    return basis_tree.inventory
 
211
 
 
212
 
 
213
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
 
214
    """Apply delta to basis and return the result.
 
215
    
 
216
    This inserts basis as a whole inventory and then uses
 
217
    add_inventory_by_delta to add delta.
 
218
 
 
219
    :param basis: An inventory to be used as the basis.
 
220
    :param delta: The inventory delta to apply:
 
221
    :return: An inventory resulting from the application.
 
222
    """
 
223
    format = self.format()
 
224
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
 
225
    repo = format.initialize(control)
 
226
    repo.lock_write()
 
227
    try:
 
228
        repo.start_write_group()
 
229
        try:
 
230
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
231
                message="", committer="foo@example.com")
 
232
            basis.revision_id = 'basis'
 
233
            create_texts_for_inv(repo, basis)
 
234
            repo.add_revision('basis', rev, basis)
 
235
            repo.commit_write_group()
 
236
        except:
 
237
            repo.abort_write_group()
 
238
            raise
 
239
    finally:
 
240
        repo.unlock()
 
241
    repo.lock_write()
 
242
    try:
 
243
        repo.start_write_group()
 
244
        try:
 
245
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
 
246
                'result', ['basis'])
 
247
        except:
 
248
            repo.abort_write_group()
 
249
            raise
 
250
        else:
 
251
            repo.commit_write_group()
 
252
    finally:
 
253
        repo.unlock()
 
254
    # Fresh lock, reads disk again.
 
255
    repo = repo.bzrdir.open_repository()
 
256
    repo.lock_read()
 
257
    self.addCleanup(repo.unlock)
 
258
    return repo.get_inventory('result')
 
259
 
 
260
 
 
261
class TestInventoryUpdates(TestCase):
 
262
 
 
263
    def test_creation_from_root_id(self):
 
264
        # iff a root id is passed to the constructor, a root directory is made
 
265
        inv = inventory.Inventory(root_id='tree-root')
 
266
        self.assertNotEqual(None, inv.root)
 
267
        self.assertEqual('tree-root', inv.root.file_id)
 
268
 
 
269
    def test_add_path_of_root(self):
 
270
        # if no root id is given at creation time, there is no root directory
 
271
        inv = inventory.Inventory(root_id=None)
 
272
        self.assertIs(None, inv.root)
 
273
        # add a root entry by adding its path
 
274
        ie = inv.add_path("", "directory", "my-root")
 
275
        ie.revision = 'test-rev'
 
276
        self.assertEqual("my-root", ie.file_id)
 
277
        self.assertIs(ie, inv.root)
 
278
 
 
279
    def test_add_path(self):
 
280
        inv = inventory.Inventory(root_id='tree_root')
 
281
        ie = inv.add_path('hello', 'file', 'hello-id')
 
282
        self.assertEqual('hello-id', ie.file_id)
 
283
        self.assertEqual('file', ie.kind)
 
284
 
 
285
    def test_copy(self):
 
286
        """Make sure copy() works and creates a deep copy."""
 
287
        inv = inventory.Inventory(root_id='some-tree-root')
 
288
        ie = inv.add_path('hello', 'file', 'hello-id')
 
289
        inv2 = inv.copy()
 
290
        inv.root.file_id = 'some-new-root'
 
291
        ie.name = 'file2'
 
292
        self.assertEqual('some-tree-root', inv2.root.file_id)
 
293
        self.assertEqual('hello', inv2['hello-id'].name)
 
294
 
 
295
    def test_copy_empty(self):
 
296
        """Make sure an empty inventory can be copied."""
 
297
        inv = inventory.Inventory(root_id=None)
 
298
        inv2 = inv.copy()
 
299
        self.assertIs(None, inv2.root)
 
300
 
 
301
    def test_copy_copies_root_revision(self):
 
302
        """Make sure the revision of the root gets copied."""
 
303
        inv = inventory.Inventory(root_id='someroot')
 
304
        inv.root.revision = 'therev'
 
305
        inv2 = inv.copy()
 
306
        self.assertEquals('someroot', inv2.root.file_id)
 
307
        self.assertEquals('therev', inv2.root.revision)
 
308
 
 
309
    def test_create_tree_reference(self):
 
310
        inv = inventory.Inventory('tree-root-123')
 
311
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
 
312
                              revision='rev', reference_revision='rev2'))
 
313
 
 
314
    def test_error_encoding(self):
 
315
        inv = inventory.Inventory('tree-root')
 
316
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
 
317
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
318
            InventoryFile('b-id', u'\u1234', 'tree-root'))
 
319
        self.assertContainsRe(str(e), r'\\u1234')
 
320
 
 
321
    def test_add_recursive(self):
 
322
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
 
323
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
 
324
        parent.children[child.file_id] = child
 
325
        inv = inventory.Inventory('tree-root')
 
326
        inv.add(parent)
 
327
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
 
328
 
 
329
 
 
330
 
 
331
class TestDeltaApplication(TestCaseWithTransport):
 
332
 
 
333
    def get_empty_inventory(self, reference_inv=None):
 
334
        """Get an empty inventory.
 
335
 
 
336
        Note that tests should not depend on the revision of the root for
 
337
        setting up test conditions, as it has to be flexible to accomodate non
 
338
        rich root repositories.
 
339
 
 
340
        :param reference_inv: If not None, get the revision for the root from
 
341
            this inventory. This is useful for dealing with older repositories
 
342
            that routinely discarded the root entry data. If None, the root's
 
343
            revision is set to 'basis'.
 
344
        """
 
345
        inv = inventory.Inventory()
 
346
        if reference_inv is not None:
 
347
            inv.root.revision = reference_inv.root.revision
 
348
        else:
 
349
            inv.root.revision = 'basis'
 
350
        return inv
 
351
 
 
352
    def test_empty_delta(self):
 
353
        inv = self.get_empty_inventory()
 
354
        delta = []
 
355
        inv = self.apply_delta(self, inv, delta)
 
356
        inv2 = self.get_empty_inventory(inv)
 
357
        self.assertEqual([], inv2._make_delta(inv))
 
358
 
 
359
    def test_None_file_id(self):
 
360
        inv = self.get_empty_inventory()
 
361
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
 
362
        dir1.revision = 'result'
 
363
        delta = [(None, u'dir1', None, dir1)]
 
364
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
365
            inv, delta)
 
366
 
 
367
    def test_unicode_file_id(self):
 
368
        inv = self.get_empty_inventory()
 
369
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
 
370
        dir1.revision = 'result'
 
371
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
372
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
373
            inv, delta)
 
374
 
 
375
    def test_repeated_file_id(self):
 
376
        inv = self.get_empty_inventory()
 
377
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
 
378
        file1.revision = 'result'
 
379
        file1.text_size = 0
 
380
        file1.text_sha1 = ""
 
381
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
 
382
        file2.revision = 'result'
 
383
        file2.text_size = 0
 
384
        file2.text_sha1 = ""
 
385
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
 
386
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
387
            inv, delta)
 
388
 
 
389
    def test_repeated_new_path(self):
 
390
        inv = self.get_empty_inventory()
 
391
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
392
        file1.revision = 'result'
 
393
        file1.text_size = 0
 
394
        file1.text_sha1 = ""
 
395
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
 
396
        file2.revision = 'result'
 
397
        file2.text_size = 0
 
398
        file2.text_sha1 = ""
 
399
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
 
400
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
401
            inv, delta)
 
402
 
 
403
    def test_repeated_old_path(self):
 
404
        inv = self.get_empty_inventory()
 
405
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
406
        file1.revision = 'result'
 
407
        file1.text_size = 0
 
408
        file1.text_sha1 = ""
 
409
        # We can't *create* a source inventory with the same path, but
 
410
        # a badly generated partial delta might claim the same source twice.
 
411
        # This would be buggy in two ways: the path is repeated in the delta,
 
412
        # And the path for one of the file ids doesn't match the source
 
413
        # location. Alternatively, we could have a repeated fileid, but that
 
414
        # is separately checked for.
 
415
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
 
416
        file2.revision = 'result'
 
417
        file2.text_size = 0
 
418
        file2.text_sha1 = ""
 
419
        inv.add(file1)
 
420
        inv.add(file2)
 
421
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
 
422
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
423
            inv, delta)
 
424
 
 
425
    def test_mismatched_id_entry_id(self):
 
426
        inv = self.get_empty_inventory()
 
427
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
428
        file1.revision = 'result'
 
429
        file1.text_size = 0
 
430
        file1.text_sha1 = ""
 
431
        delta = [(None, u'path', 'id', file1)]
 
432
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
433
            inv, delta)
 
434
 
 
435
    def test_mismatched_new_path_entry_None(self):
 
436
        inv = self.get_empty_inventory()
 
437
        delta = [(None, u'path', 'id', None)]
 
438
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
439
            inv, delta)
 
440
 
 
441
    def test_mismatched_new_path_None_entry(self):
 
442
        inv = self.get_empty_inventory()
 
443
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
444
        file1.revision = 'result'
 
445
        file1.text_size = 0
 
446
        file1.text_sha1 = ""
 
447
        delta = [(u"path", None, 'id1', file1)]
 
448
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
449
            inv, delta)
 
450
 
 
451
    def test_parent_is_not_directory(self):
 
452
        inv = self.get_empty_inventory()
 
453
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
454
        file1.revision = 'result'
 
455
        file1.text_size = 0
 
456
        file1.text_sha1 = ""
 
457
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
 
458
        file2.revision = 'result'
 
459
        file2.text_size = 0
 
460
        file2.text_sha1 = ""
 
461
        inv.add(file1)
 
462
        delta = [(None, u'path/path2', 'id2', file2)]
 
463
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
464
            inv, delta)
 
465
 
 
466
    def test_parent_is_missing(self):
 
467
        inv = self.get_empty_inventory()
 
468
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
 
469
        file2.revision = 'result'
 
470
        file2.text_size = 0
 
471
        file2.text_sha1 = ""
 
472
        delta = [(None, u'path/path2', 'id2', file2)]
 
473
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
474
            inv, delta)
 
475
 
 
476
    def test_new_parent_path_has_wrong_id(self):
 
477
        inv = self.get_empty_inventory()
 
478
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
479
        parent1.revision = 'result'
 
480
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
481
        parent2.revision = 'result'
 
482
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
483
        file1.revision = 'result'
 
484
        file1.text_size = 0
 
485
        file1.text_sha1 = ""
 
486
        inv.add(parent1)
 
487
        inv.add(parent2)
 
488
        # This delta claims that file1 is at dir/path, but actually its at
 
489
        # dir2/path if you follow the inventory parent structure.
 
490
        delta = [(None, u'dir/path', 'id', file1)]
 
491
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
492
            inv, delta)
 
493
 
 
494
    def test_old_parent_path_is_wrong(self):
 
495
        inv = self.get_empty_inventory()
 
496
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
497
        parent1.revision = 'result'
 
498
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
499
        parent2.revision = 'result'
 
500
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
501
        file1.revision = 'result'
 
502
        file1.text_size = 0
 
503
        file1.text_sha1 = ""
 
504
        inv.add(parent1)
 
505
        inv.add(parent2)
 
506
        inv.add(file1)
 
507
        # This delta claims that file1 was at dir/path, but actually it was at
 
508
        # dir2/path if you follow the inventory parent structure.
 
509
        delta = [(u'dir/path', None, 'id', None)]
 
510
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
511
            inv, delta)
 
512
 
 
513
    def test_old_parent_path_is_for_other_id(self):
 
514
        inv = self.get_empty_inventory()
 
515
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
516
        parent1.revision = 'result'
 
517
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
518
        parent2.revision = 'result'
 
519
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
520
        file1.revision = 'result'
 
521
        file1.text_size = 0
 
522
        file1.text_sha1 = ""
 
523
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
 
524
        file2.revision = 'result'
 
525
        file2.text_size = 0
 
526
        file2.text_sha1 = ""
 
527
        inv.add(parent1)
 
528
        inv.add(parent2)
 
529
        inv.add(file1)
 
530
        inv.add(file2)
 
531
        # This delta claims that file1 was at dir/path, but actually it was at
 
532
        # dir2/path if you follow the inventory parent structure. At dir/path
 
533
        # is another entry we should not delete.
 
534
        delta = [(u'dir/path', None, 'id', None)]
 
535
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
536
            inv, delta)
 
537
 
 
538
    def test_add_existing_id_new_path(self):
 
539
        inv = self.get_empty_inventory()
 
540
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
541
        parent1.revision = 'result'
 
542
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
 
543
        parent2.revision = 'result'
 
544
        inv.add(parent1)
 
545
        delta = [(None, u'dir2', 'p-1', parent2)]
 
546
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
547
            inv, delta)
 
548
 
 
549
    def test_add_new_id_existing_path(self):
 
550
        inv = self.get_empty_inventory()
 
551
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
552
        parent1.revision = 'result'
 
553
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
 
554
        parent2.revision = 'result'
 
555
        inv.add(parent1)
 
556
        delta = [(None, u'dir1', 'p-2', parent2)]
 
557
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
558
            inv, delta)
 
559
 
 
560
    def test_remove_dir_leaving_dangling_child(self):
 
561
        inv = self.get_empty_inventory()
 
562
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
563
        dir1.revision = 'result'
 
564
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
 
565
        dir2.revision = 'result'
 
566
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
 
567
        dir3.revision = 'result'
 
568
        inv.add(dir1)
 
569
        inv.add(dir2)
 
570
        inv.add(dir3)
 
571
        delta = [(u'dir1', None, 'p-1', None),
 
572
            (u'dir1/child2', None, 'p-3', None)]
 
573
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
574
            inv, delta)
 
575
 
 
576
 
 
577
class TestInventory(TestCase):
 
578
 
 
579
    def test_is_root(self):
 
580
        """Ensure our root-checking code is accurate."""
 
581
        inv = inventory.Inventory('TREE_ROOT')
 
582
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
583
        self.assertFalse(inv.is_root('booga'))
 
584
        inv.root.file_id = 'booga'
 
585
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
586
        self.assertTrue(inv.is_root('booga'))
 
587
        # works properly even if no root is set
 
588
        inv.root = None
 
589
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
590
        self.assertFalse(inv.is_root('booga'))
 
591
 
 
592
 
 
593
class TestInventoryEntry(TestCase):
 
594
 
 
595
    def test_file_kind_character(self):
 
596
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
597
        self.assertEqual(file.kind_character(), '')
 
598
 
 
599
    def test_dir_kind_character(self):
 
600
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
601
        self.assertEqual(dir.kind_character(), '/')
 
602
 
 
603
    def test_link_kind_character(self):
 
604
        dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
605
        self.assertEqual(dir.kind_character(), '')
 
606
 
 
607
    def test_dir_detect_changes(self):
 
608
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
609
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
610
        self.assertEqual((False, False), left.detect_changes(right))
 
611
        self.assertEqual((False, False), right.detect_changes(left))
 
612
 
 
613
    def test_file_detect_changes(self):
 
614
        left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
615
        left.text_sha1 = 123
 
616
        right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
617
        right.text_sha1 = 123
 
618
        self.assertEqual((False, False), left.detect_changes(right))
 
619
        self.assertEqual((False, False), right.detect_changes(left))
 
620
        left.executable = True
 
621
        self.assertEqual((False, True), left.detect_changes(right))
 
622
        self.assertEqual((False, True), right.detect_changes(left))
 
623
        right.text_sha1 = 321
 
624
        self.assertEqual((True, True), left.detect_changes(right))
 
625
        self.assertEqual((True, True), right.detect_changes(left))
 
626
 
 
627
    def test_symlink_detect_changes(self):
 
628
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
629
        left.symlink_target='foo'
 
630
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
631
        right.symlink_target='foo'
 
632
        self.assertEqual((False, False), left.detect_changes(right))
 
633
        self.assertEqual((False, False), right.detect_changes(left))
 
634
        left.symlink_target = 'different'
 
635
        self.assertEqual((True, False), left.detect_changes(right))
 
636
        self.assertEqual((True, False), right.detect_changes(left))
 
637
 
 
638
    def test_file_has_text(self):
 
639
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
640
        self.failUnless(file.has_text())
 
641
 
 
642
    def test_directory_has_text(self):
 
643
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
644
        self.failIf(dir.has_text())
 
645
 
 
646
    def test_link_has_text(self):
 
647
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
648
        self.failIf(link.has_text())
 
649
 
 
650
    def test_make_entry(self):
 
651
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
652
            inventory.InventoryFile)
 
653
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
654
            inventory.InventoryLink)
 
655
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
656
            inventory.InventoryDirectory)
 
657
 
 
658
    def test_make_entry_non_normalized(self):
 
659
        orig_normalized_filename = osutils.normalized_filename
 
660
 
 
661
        try:
 
662
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
663
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
664
            self.assertEqual(u'\xe5', entry.name)
 
665
            self.assertIsInstance(entry, inventory.InventoryFile)
 
666
 
 
667
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
668
            self.assertRaises(errors.InvalidNormalization,
 
669
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
670
        finally:
 
671
            osutils.normalized_filename = orig_normalized_filename
 
672
 
 
673
 
 
674
class TestDescribeChanges(TestCase):
 
675
 
 
676
    def test_describe_change(self):
 
677
        # we need to test the following change combinations:
 
678
        # rename
 
679
        # reparent
 
680
        # modify
 
681
        # gone
 
682
        # added
 
683
        # renamed/reparented and modified
 
684
        # change kind (perhaps can't be done yet?)
 
685
        # also, merged in combination with all of these?
 
686
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
687
        old_a.text_sha1 = '123132'
 
688
        old_a.text_size = 0
 
689
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
690
        new_a.text_sha1 = '123132'
 
691
        new_a.text_size = 0
 
692
 
 
693
        self.assertChangeDescription('unchanged', old_a, new_a)
 
694
 
 
695
        new_a.text_size = 10
 
696
        new_a.text_sha1 = 'abcabc'
 
697
        self.assertChangeDescription('modified', old_a, new_a)
 
698
 
 
699
        self.assertChangeDescription('added', None, new_a)
 
700
        self.assertChangeDescription('removed', old_a, None)
 
701
        # perhaps a bit questionable but seems like the most reasonable thing...
 
702
        self.assertChangeDescription('unchanged', None, None)
 
703
 
 
704
        # in this case it's both renamed and modified; show a rename and
 
705
        # modification:
 
706
        new_a.name = 'newfilename'
 
707
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
708
 
 
709
        # reparenting is 'renaming'
 
710
        new_a.name = old_a.name
 
711
        new_a.parent_id = 'somedir-id'
 
712
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
713
 
 
714
        # reset the content values so its not modified
 
715
        new_a.text_size = old_a.text_size
 
716
        new_a.text_sha1 = old_a.text_sha1
 
717
        new_a.name = old_a.name
 
718
 
 
719
        new_a.name = 'newfilename'
 
720
        self.assertChangeDescription('renamed', old_a, new_a)
 
721
 
 
722
        # reparenting is 'renaming'
 
723
        new_a.name = old_a.name
 
724
        new_a.parent_id = 'somedir-id'
 
725
        self.assertChangeDescription('renamed', old_a, new_a)
 
726
 
 
727
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
728
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
729
        self.assertEqual(expected_change, change)
 
730
 
 
731
 
 
732
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
733
 
 
734
    def get_chk_bytes(self):
 
735
        factory = groupcompress.make_pack_factory(True, True, 1)
 
736
        trans = self.get_transport('')
 
737
        return factory(trans)
 
738
 
 
739
    def read_bytes(self, chk_bytes, key):
 
740
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
741
        return stream.next().get_bytes_as("fulltext")
 
742
 
 
743
    def test_deserialise_gives_CHKInventory(self):
 
744
        inv = Inventory()
 
745
        inv.revision_id = "revid"
 
746
        inv.root.revision = "rootrev"
 
747
        chk_bytes = self.get_chk_bytes()
 
748
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
749
        bytes = ''.join(chk_inv.to_lines())
 
750
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
751
        self.assertEqual("revid", new_inv.revision_id)
 
752
        self.assertEqual("directory", new_inv.root.kind)
 
753
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
754
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
755
        self.assertEqual(inv.root.name, new_inv.root.name)
 
756
        self.assertEqual("rootrev", new_inv.root.revision)
 
757
        self.assertEqual('plain', new_inv._search_key_name)
 
758
 
 
759
    def test_deserialise_wrong_revid(self):
 
760
        inv = Inventory()
 
761
        inv.revision_id = "revid"
 
762
        inv.root.revision = "rootrev"
 
763
        chk_bytes = self.get_chk_bytes()
 
764
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
765
        bytes = ''.join(chk_inv.to_lines())
 
766
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
767
            bytes, ("revid2",))
 
768
 
 
769
    def test_captures_rev_root_byid(self):
 
770
        inv = Inventory()
 
771
        inv.revision_id = "foo"
 
772
        inv.root.revision = "bar"
 
773
        chk_bytes = self.get_chk_bytes()
 
774
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
775
        lines = chk_inv.to_lines()
 
776
        self.assertEqual([
 
777
            'chkinventory:\n',
 
778
            'revision_id: foo\n',
 
779
            'root_id: TREE_ROOT\n',
 
780
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
781
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
782
            ], lines)
 
783
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
784
        self.assertEqual('plain', chk_inv._search_key_name)
 
785
 
 
786
    def test_captures_parent_id_basename_index(self):
 
787
        inv = Inventory()
 
788
        inv.revision_id = "foo"
 
789
        inv.root.revision = "bar"
 
790
        chk_bytes = self.get_chk_bytes()
 
791
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
792
        lines = chk_inv.to_lines()
 
793
        self.assertEqual([
 
794
            'chkinventory:\n',
 
795
            'revision_id: foo\n',
 
796
            'root_id: TREE_ROOT\n',
 
797
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
798
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
799
            ], lines)
 
800
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
801
        self.assertEqual('plain', chk_inv._search_key_name)
 
802
 
 
803
    def test_captures_search_key_name(self):
 
804
        inv = Inventory()
 
805
        inv.revision_id = "foo"
 
806
        inv.root.revision = "bar"
 
807
        chk_bytes = self.get_chk_bytes()
 
808
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
809
                                              search_key_name='hash-16-way')
 
810
        lines = chk_inv.to_lines()
 
811
        self.assertEqual([
 
812
            'chkinventory:\n',
 
813
            'search_key_name: hash-16-way\n',
 
814
            'root_id: TREE_ROOT\n',
 
815
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
816
            'revision_id: foo\n',
 
817
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
818
            ], lines)
 
819
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
820
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
 
821
 
 
822
    def test_directory_children_on_demand(self):
 
823
        inv = Inventory()
 
824
        inv.revision_id = "revid"
 
825
        inv.root.revision = "rootrev"
 
826
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
827
        inv["fileid"].revision = "filerev"
 
828
        inv["fileid"].executable = True
 
829
        inv["fileid"].text_sha1 = "ffff"
 
830
        inv["fileid"].text_size = 1
 
831
        chk_bytes = self.get_chk_bytes()
 
832
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
833
        bytes = ''.join(chk_inv.to_lines())
 
834
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
835
        root_entry = new_inv[inv.root.file_id]
 
836
        self.assertEqual(None, root_entry._children)
 
837
        self.assertEqual(['file'], root_entry.children.keys())
 
838
        file_direct = new_inv["fileid"]
 
839
        file_found = root_entry.children['file']
 
840
        self.assertEqual(file_direct.kind, file_found.kind)
 
841
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
842
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
843
        self.assertEqual(file_direct.name, file_found.name)
 
844
        self.assertEqual(file_direct.revision, file_found.revision)
 
845
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
846
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
847
        self.assertEqual(file_direct.executable, file_found.executable)
 
848
 
 
849
    def test_from_inventory_maximum_size(self):
 
850
        # from_inventory supports the maximum_size parameter.
 
851
        inv = Inventory()
 
852
        inv.revision_id = "revid"
 
853
        inv.root.revision = "rootrev"
 
854
        chk_bytes = self.get_chk_bytes()
 
855
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
856
        chk_inv.id_to_entry._ensure_root()
 
857
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
858
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
859
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
860
        p_id_basename._ensure_root()
 
861
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
862
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
863
 
 
864
    def test___iter__(self):
 
865
        inv = Inventory()
 
866
        inv.revision_id = "revid"
 
867
        inv.root.revision = "rootrev"
 
868
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
869
        inv["fileid"].revision = "filerev"
 
870
        inv["fileid"].executable = True
 
871
        inv["fileid"].text_sha1 = "ffff"
 
872
        inv["fileid"].text_size = 1
 
873
        chk_bytes = self.get_chk_bytes()
 
874
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
875
        bytes = ''.join(chk_inv.to_lines())
 
876
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
877
        fileids = list(new_inv.__iter__())
 
878
        fileids.sort()
 
879
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
 
880
 
 
881
    def test__len__(self):
 
882
        inv = Inventory()
 
883
        inv.revision_id = "revid"
 
884
        inv.root.revision = "rootrev"
 
885
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
886
        inv["fileid"].revision = "filerev"
 
887
        inv["fileid"].executable = True
 
888
        inv["fileid"].text_sha1 = "ffff"
 
889
        inv["fileid"].text_size = 1
 
890
        chk_bytes = self.get_chk_bytes()
 
891
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
892
        self.assertEqual(2, len(chk_inv))
 
893
 
 
894
    def test___getitem__(self):
 
895
        inv = Inventory()
 
896
        inv.revision_id = "revid"
 
897
        inv.root.revision = "rootrev"
 
898
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
899
        inv["fileid"].revision = "filerev"
 
900
        inv["fileid"].executable = True
 
901
        inv["fileid"].text_sha1 = "ffff"
 
902
        inv["fileid"].text_size = 1
 
903
        chk_bytes = self.get_chk_bytes()
 
904
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
905
        bytes = ''.join(chk_inv.to_lines())
 
906
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
907
        root_entry = new_inv[inv.root.file_id]
 
908
        file_entry = new_inv["fileid"]
 
909
        self.assertEqual("directory", root_entry.kind)
 
910
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
911
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
912
        self.assertEqual(inv.root.name, root_entry.name)
 
913
        self.assertEqual("rootrev", root_entry.revision)
 
914
        self.assertEqual("file", file_entry.kind)
 
915
        self.assertEqual("fileid", file_entry.file_id)
 
916
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
917
        self.assertEqual("file", file_entry.name)
 
918
        self.assertEqual("filerev", file_entry.revision)
 
919
        self.assertEqual("ffff", file_entry.text_sha1)
 
920
        self.assertEqual(1, file_entry.text_size)
 
921
        self.assertEqual(True, file_entry.executable)
 
922
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
 
923
 
 
924
    def test_has_id_true(self):
 
925
        inv = Inventory()
 
926
        inv.revision_id = "revid"
 
927
        inv.root.revision = "rootrev"
 
928
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
929
        inv["fileid"].revision = "filerev"
 
930
        inv["fileid"].executable = True
 
931
        inv["fileid"].text_sha1 = "ffff"
 
932
        inv["fileid"].text_size = 1
 
933
        chk_bytes = self.get_chk_bytes()
 
934
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
935
        self.assertTrue(chk_inv.has_id('fileid'))
 
936
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
937
 
 
938
    def test_has_id_not(self):
 
939
        inv = Inventory()
 
940
        inv.revision_id = "revid"
 
941
        inv.root.revision = "rootrev"
 
942
        chk_bytes = self.get_chk_bytes()
 
943
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
944
        self.assertFalse(chk_inv.has_id('fileid'))
 
945
 
 
946
    def test_id2path(self):
 
947
        inv = Inventory()
 
948
        inv.revision_id = "revid"
 
949
        inv.root.revision = "rootrev"
 
950
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
951
        fileentry = InventoryFile("fileid", "file", "dirid")
 
952
        inv.add(direntry)
 
953
        inv.add(fileentry)
 
954
        inv["fileid"].revision = "filerev"
 
955
        inv["fileid"].executable = True
 
956
        inv["fileid"].text_sha1 = "ffff"
 
957
        inv["fileid"].text_size = 1
 
958
        inv["dirid"].revision = "filerev"
 
959
        chk_bytes = self.get_chk_bytes()
 
960
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
961
        bytes = ''.join(chk_inv.to_lines())
 
962
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
963
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
964
        self.assertEqual('dir', new_inv.id2path('dirid'))
 
965
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
 
966
 
 
967
    def test_path2id(self):
 
968
        inv = Inventory()
 
969
        inv.revision_id = "revid"
 
970
        inv.root.revision = "rootrev"
 
971
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
972
        fileentry = InventoryFile("fileid", "file", "dirid")
 
973
        inv.add(direntry)
 
974
        inv.add(fileentry)
 
975
        inv["fileid"].revision = "filerev"
 
976
        inv["fileid"].executable = True
 
977
        inv["fileid"].text_sha1 = "ffff"
 
978
        inv["fileid"].text_size = 1
 
979
        inv["dirid"].revision = "filerev"
 
980
        chk_bytes = self.get_chk_bytes()
 
981
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
982
        bytes = ''.join(chk_inv.to_lines())
 
983
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
984
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
985
        self.assertEqual('dirid', new_inv.path2id('dir'))
 
986
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
 
987
 
 
988
    def test_create_by_apply_delta_sets_root(self):
 
989
        inv = Inventory()
 
990
        inv.revision_id = "revid"
 
991
        chk_bytes = self.get_chk_bytes()
 
992
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
993
        inv.add_path("", "directory", "myrootid", None)
 
994
        inv.revision_id = "expectedid"
 
995
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
996
        delta = [("", None, base_inv.root.file_id, None),
 
997
            (None, "",  "myrootid", inv.root)]
 
998
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
999
        self.assertEquals(reference_inv.root, new_inv.root)
 
1000
 
 
1001
    def test_create_by_apply_delta_empty_add_child(self):
 
1002
        inv = Inventory()
 
1003
        inv.revision_id = "revid"
 
1004
        inv.root.revision = "rootrev"
 
1005
        chk_bytes = self.get_chk_bytes()
 
1006
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1007
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1008
        a_entry.revision = "filerev"
 
1009
        a_entry.executable = True
 
1010
        a_entry.text_sha1 = "ffff"
 
1011
        a_entry.text_size = 1
 
1012
        inv.add(a_entry)
 
1013
        inv.revision_id = "expectedid"
 
1014
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1015
        delta = [(None, "A",  "A-id", a_entry)]
 
1016
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1017
        # new_inv should be the same as reference_inv.
 
1018
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1019
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1020
        reference_inv.id_to_entry._ensure_root()
 
1021
        new_inv.id_to_entry._ensure_root()
 
1022
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1023
            new_inv.id_to_entry._root_node._key)
 
1024
 
 
1025
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1026
        inv = Inventory()
 
1027
        inv.revision_id = "revid"
 
1028
        inv.root.revision = "rootrev"
 
1029
        chk_bytes = self.get_chk_bytes()
 
1030
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1031
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1032
        a_entry.revision = "filerev"
 
1033
        a_entry.executable = True
 
1034
        a_entry.text_sha1 = "ffff"
 
1035
        a_entry.text_size = 1
 
1036
        inv.add(a_entry)
 
1037
        inv.revision_id = "expectedid"
 
1038
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1039
        delta = [(None, "A",  "A-id", a_entry)]
 
1040
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1041
        reference_inv.id_to_entry._ensure_root()
 
1042
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1043
        new_inv.id_to_entry._ensure_root()
 
1044
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1045
        # new_inv should be the same as reference_inv.
 
1046
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1047
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1048
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1049
            new_inv.id_to_entry._root_node._key)
 
1050
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1051
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
1052
 
 
1053
    def test_iter_changes(self):
 
1054
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1055
        # InterTree are coming.
 
1056
        inv = Inventory()
 
1057
        inv.revision_id = "revid"
 
1058
        inv.root.revision = "rootrev"
 
1059
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1060
        inv["fileid"].revision = "filerev"
 
1061
        inv["fileid"].executable = True
 
1062
        inv["fileid"].text_sha1 = "ffff"
 
1063
        inv["fileid"].text_size = 1
 
1064
        inv2 = Inventory()
 
1065
        inv2.revision_id = "revid2"
 
1066
        inv2.root.revision = "rootrev"
 
1067
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1068
        inv2["fileid"].revision = "filerev2"
 
1069
        inv2["fileid"].executable = False
 
1070
        inv2["fileid"].text_sha1 = "bbbb"
 
1071
        inv2["fileid"].text_size = 2
 
1072
        # get fresh objects.
 
1073
        chk_bytes = self.get_chk_bytes()
 
1074
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1075
        bytes = ''.join(chk_inv.to_lines())
 
1076
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1077
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1078
        bytes = ''.join(chk_inv2.to_lines())
 
1079
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
 
1080
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
 
1081
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
1082
            (False, True))],
 
1083
            list(inv_1.iter_changes(inv_2)))
 
1084
 
 
1085
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1086
        inv = Inventory()
 
1087
        inv.revision_id = "revid"
 
1088
        inv.root.revision = "rootrev"
 
1089
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1090
        inv["fileid"].revision = "filerev"
 
1091
        inv["fileid"].executable = True
 
1092
        inv["fileid"].text_sha1 = "ffff"
 
1093
        inv["fileid"].text_size = 1
 
1094
        # get fresh objects.
 
1095
        chk_bytes = self.get_chk_bytes()
 
1096
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1097
        bytes = ''.join(tmp_inv.to_lines())
 
1098
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1099
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1100
        self.assertEqual(
 
1101
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
 
1102
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1103
 
 
1104
    def test_file_entry_to_bytes(self):
 
1105
        inv = CHKInventory(None)
 
1106
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
 
1107
        ie.executable = True
 
1108
        ie.revision = 'file-rev-id'
 
1109
        ie.text_sha1 = 'abcdefgh'
 
1110
        ie.text_size = 100
 
1111
        bytes = inv._entry_to_bytes(ie)
 
1112
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
 
1113
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1114
        ie2 = inv._bytes_to_entry(bytes)
 
1115
        self.assertEqual(ie, ie2)
 
1116
        self.assertIsInstance(ie2.name, unicode)
 
1117
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
 
1118
                         inv._bytes_to_utf8name_key(bytes))
 
1119
 
 
1120
    def test_file2_entry_to_bytes(self):
 
1121
        inv = CHKInventory(None)
 
1122
        # \u30a9 == 'omega'
 
1123
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
 
1124
        ie.executable = False
 
1125
        ie.revision = 'file-rev-id'
 
1126
        ie.text_sha1 = '123456'
 
1127
        ie.text_size = 25
 
1128
        bytes = inv._entry_to_bytes(ie)
 
1129
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
 
1130
                         'file-rev-id\n123456\n25\nN', bytes)
 
1131
        ie2 = inv._bytes_to_entry(bytes)
 
1132
        self.assertEqual(ie, ie2)
 
1133
        self.assertIsInstance(ie2.name, unicode)
 
1134
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
 
1135
                         inv._bytes_to_utf8name_key(bytes))
 
1136
 
 
1137
    def test_dir_entry_to_bytes(self):
 
1138
        inv = CHKInventory(None)
 
1139
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
 
1140
        ie.revision = 'dir-rev-id'
 
1141
        bytes = inv._entry_to_bytes(ie)
 
1142
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1143
        ie2 = inv._bytes_to_entry(bytes)
 
1144
        self.assertEqual(ie, ie2)
 
1145
        self.assertIsInstance(ie2.name, unicode)
 
1146
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
 
1147
                         inv._bytes_to_utf8name_key(bytes))
 
1148
 
 
1149
    def test_dir2_entry_to_bytes(self):
 
1150
        inv = CHKInventory(None)
 
1151
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
 
1152
                                          None)
 
1153
        ie.revision = 'dir-rev-id'
 
1154
        bytes = inv._entry_to_bytes(ie)
 
1155
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
 
1156
                         'dir-rev-id', bytes)
 
1157
        ie2 = inv._bytes_to_entry(bytes)
 
1158
        self.assertEqual(ie, ie2)
 
1159
        self.assertIsInstance(ie2.name, unicode)
 
1160
        self.assertIs(ie2.parent_id, None)
 
1161
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
 
1162
                         inv._bytes_to_utf8name_key(bytes))
 
1163
 
 
1164
    def test_symlink_entry_to_bytes(self):
 
1165
        inv = CHKInventory(None)
 
1166
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
 
1167
        ie.revision = 'link-rev-id'
 
1168
        ie.symlink_target = u'target/path'
 
1169
        bytes = inv._entry_to_bytes(ie)
 
1170
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
 
1171
                         'link-rev-id\ntarget/path', bytes)
 
1172
        ie2 = inv._bytes_to_entry(bytes)
 
1173
        self.assertEqual(ie, ie2)
 
1174
        self.assertIsInstance(ie2.name, unicode)
 
1175
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1176
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
 
1177
                         inv._bytes_to_utf8name_key(bytes))
 
1178
 
 
1179
    def test_symlink2_entry_to_bytes(self):
 
1180
        inv = CHKInventory(None)
 
1181
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
 
1182
        ie.revision = 'link-rev-id'
 
1183
        ie.symlink_target = u'target/\u03a9path'
 
1184
        bytes = inv._entry_to_bytes(ie)
 
1185
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1186
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1187
        ie2 = inv._bytes_to_entry(bytes)
 
1188
        self.assertEqual(ie, ie2)
 
1189
        self.assertIsInstance(ie2.name, unicode)
 
1190
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1191
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
 
1192
                         inv._bytes_to_utf8name_key(bytes))
 
1193
 
 
1194
    def test_tree_reference_entry_to_bytes(self):
 
1195
        inv = CHKInventory(None)
 
1196
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
 
1197
                                     'parent-id')
 
1198
        ie.revision = 'tree-rev-id'
 
1199
        ie.reference_revision = 'ref-rev-id'
 
1200
        bytes = inv._entry_to_bytes(ie)
 
1201
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1202
                         'tree-rev-id\nref-rev-id', bytes)
 
1203
        ie2 = inv._bytes_to_entry(bytes)
 
1204
        self.assertEqual(ie, ie2)
 
1205
        self.assertIsInstance(ie2.name, unicode)
 
1206
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
 
1207
                         inv._bytes_to_utf8name_key(bytes))
 
1208
 
 
1209
 
 
1210
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1211
 
 
1212
    def get_chk_bytes(self):
 
1213
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1214
        trans = self.get_transport('')
 
1215
        return factory(trans)
 
1216
 
 
1217
    def make_dir(self, inv, name, parent_id):
 
1218
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
 
1219
 
 
1220
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1221
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1222
        ie.text_sha1 = osutils.sha_string(content)
 
1223
        ie.text_size = len(content)
 
1224
        inv.add(ie)
 
1225
 
 
1226
    def make_simple_inventory(self):
 
1227
        inv = Inventory('TREE_ROOT')
 
1228
        inv.revision_id = "revid"
 
1229
        inv.root.revision = "rootrev"
 
1230
        # /                 TREE_ROOT
 
1231
        # dir1/             dir1-id
 
1232
        #   sub-file1       sub-file1-id
 
1233
        #   sub-file2       sub-file2-id
 
1234
        #   sub-dir1/       sub-dir1-id
 
1235
        #     subsub-file1  subsub-file1-id
 
1236
        # dir2/             dir2-id
 
1237
        #   sub2-file1      sub2-file1-id
 
1238
        # top               top-id
 
1239
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1240
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1241
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1242
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1243
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1244
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1245
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1246
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1247
        chk_bytes = self.get_chk_bytes()
 
1248
        #  use a small maximum_size to force internal paging structures
 
1249
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1250
                        maximum_size=100,
 
1251
                        search_key_name='hash-255-way')
 
1252
        bytes = ''.join(chk_inv.to_lines())
 
1253
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1254
 
 
1255
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1256
        self.assertEqual(sorted(expected_fileids),
 
1257
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1258
 
 
1259
    def assertExpand(self, all_ids, inv, file_ids):
 
1260
        (val_all_ids,
 
1261
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1262
        self.assertEqual(set(all_ids), val_all_ids)
 
1263
        entries = inv._getitems(val_all_ids)
 
1264
        expected_children = {}
 
1265
        for entry in entries:
 
1266
            s = expected_children.setdefault(entry.parent_id, [])
 
1267
            s.append(entry.file_id)
 
1268
        val_children = dict((k, sorted(v)) for k, v
 
1269
                            in val_children.iteritems())
 
1270
        expected_children = dict((k, sorted(v)) for k, v
 
1271
                            in expected_children.iteritems())
 
1272
        self.assertEqual(expected_children, val_children)
 
1273
 
 
1274
    def test_make_simple_inventory(self):
 
1275
        inv = self.make_simple_inventory()
 
1276
        layout = []
 
1277
        for path, entry in inv.iter_entries_by_dir():
 
1278
            layout.append((path, entry.file_id))
 
1279
        self.assertEqual([
 
1280
            ('', 'TREE_ROOT'),
 
1281
            ('dir1', 'dir1-id'),
 
1282
            ('dir2', 'dir2-id'),
 
1283
            ('top', 'top-id'),
 
1284
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1285
            ('dir1/sub-file1', 'sub-file1-id'),
 
1286
            ('dir1/sub-file2', 'sub-file2-id'),
 
1287
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1288
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1289
            ], layout)
 
1290
 
 
1291
    def test__getitems(self):
 
1292
        inv = self.make_simple_inventory()
 
1293
        # Reading from disk
 
1294
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1295
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1296
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1297
        # From cache
 
1298
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1299
        # Mixed
 
1300
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1301
                             ['dir1-id', 'sub-file2-id'])
 
1302
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1303
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1304
 
 
1305
    def test_single_file(self):
 
1306
        inv = self.make_simple_inventory()
 
1307
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1308
 
 
1309
    def test_get_all_parents(self):
 
1310
        inv = self.make_simple_inventory()
 
1311
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1312
                           'subsub-file1-id',
 
1313
                          ], inv, ['subsub-file1-id'])
 
1314
 
 
1315
    def test_get_children(self):
 
1316
        inv = self.make_simple_inventory()
 
1317
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1318
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1319
                          ], inv, ['dir1-id'])
 
1320
 
 
1321
    def test_from_root(self):
 
1322
        inv = self.make_simple_inventory()
 
1323
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1324
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1325
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1326
 
 
1327
    def test_top_level_file(self):
 
1328
        inv = self.make_simple_inventory()
 
1329
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1330
 
 
1331
    def test_subsub_file(self):
 
1332
        inv = self.make_simple_inventory()
 
1333
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1334
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1335
 
 
1336
    def test_sub_and_root(self):
 
1337
        inv = self.make_simple_inventory()
 
1338
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1339
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])