~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

  • Committer: Martin Pool
  • Date: 2010-01-12 02:00:23 UTC
  • mto: This revision was merged to the branch mainline in revision 4949.
  • Revision ID: mbp@sourcefrog.net-20100112020023-ib3ii1wcpvljmprk
Update bug handling doc to deprecate fixcommitted and to explain other states better

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005, 2006, 2007, 2008, 2009 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
from bzrlib.selftest import TestBase
18
 
 
19
 
from bzrlib.inventory import Inventory, InventoryEntry
20
 
 
21
 
 
22
 
class TestIsWithin(TestBase):
23
 
    def runTest(self):
24
 
        from bzrlib.osutils import is_inside_any
25
 
        
26
 
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
27
 
                         (['src'], 'src/foo.c'),
28
 
                         (['src'], 'src'),
29
 
                         ]:
30
 
            self.assert_(is_inside_any(dirs, fn))
31
 
            
32
 
        for dirs, fn in [(['src'], 'srccontrol'),
33
 
                         (['src'], 'srccontrol/foo')]:
34
 
            self.assertFalse(is_inside_any(dirs, fn))
35
 
            
36
 
            
37
 
            
38
 
class TestInventoryIds(TestBase):
39
 
    def runTest(self):
40
 
        """Test detection of files within selected directories."""
41
 
        inv = Inventory()
42
 
        
43
 
        for args in [('src', 'directory', 'src-id'), 
44
 
                     ('doc', 'directory', 'doc-id'), 
45
 
                     ('src/hello.c', 'file'),
46
 
                     ('src/bye.c', 'file', 'bye-id'),
47
 
                     ('Makefile', 'file')]:
48
 
            inv.add_path(*args)
49
 
            
50
 
        self.assertEqual(inv.path2id('src'), 'src-id')
51
 
        self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
52
 
        
53
 
        self.assert_('src-id' in inv)
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from bzrlib import (
 
19
    chk_map,
 
20
    groupcompress,
 
21
    bzrdir,
 
22
    errors,
 
23
    inventory,
 
24
    osutils,
 
25
    repository,
 
26
    revision,
 
27
    tests,
 
28
    )
 
29
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
 
30
    InventoryDirectory, InventoryEntry, TreeReference)
 
31
from bzrlib.tests import (
 
32
    TestCase,
 
33
    TestCaseWithTransport,
 
34
    condition_isinstance,
 
35
    multiply_tests,
 
36
    split_suite_by_condition,
 
37
    )
 
38
from bzrlib.tests.per_workingtree import workingtree_formats
 
39
 
 
40
 
 
41
def load_tests(standard_tests, module, loader):
 
42
    """Parameterise some inventory tests."""
 
43
    to_adapt, result = split_suite_by_condition(standard_tests,
 
44
        condition_isinstance(TestDeltaApplication))
 
45
    scenarios = [
 
46
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
47
        ]
 
48
    # Working tree basis delta application
 
49
    # Repository add_inv_by_delta.
 
50
    # Reduce form of the per_repository test logic - that logic needs to be
 
51
    # be able to get /just/ repositories whereas these tests are fine with
 
52
    # just creating trees.
 
53
    formats = set()
 
54
    for _, format in repository.format_registry.iteritems():
 
55
        scenarios.append((str(format.__name__), {
 
56
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
57
            'format':format}))
 
58
    for format in workingtree_formats():
 
59
        scenarios.append(
 
60
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
61
            'apply_delta':apply_inventory_WT_basis,
 
62
            'format':format}))
 
63
        scenarios.append(
 
64
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
65
            'apply_delta':apply_inventory_WT,
 
66
            'format':format}))
 
67
    return multiply_tests(to_adapt, scenarios, result)
 
68
 
 
69
 
 
70
def create_texts_for_inv(repo, inv):
 
71
    for path, ie in inv.iter_entries():
 
72
        if ie.text_size:
 
73
            lines = ['a' * ie.text_size]
 
74
        else:
 
75
            lines = []
 
76
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
77
    
 
78
def apply_inventory_Inventory(self, basis, delta):
 
79
    """Apply delta to basis and return the result.
 
80
    
 
81
    :param basis: An inventory to be used as the basis.
 
82
    :param delta: The inventory delta to apply:
 
83
    :return: An inventory resulting from the application.
 
84
    """
 
85
    basis.apply_delta(delta)
 
86
    return basis
 
87
 
 
88
 
 
89
def apply_inventory_WT(self, basis, delta):
 
90
    """Apply delta to basis and return the result.
 
91
 
 
92
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
93
    
 
94
    :param basis: An inventory to be used as the basis.
 
95
    :param delta: The inventory delta to apply:
 
96
    :return: An inventory resulting from the application.
 
97
    """
 
98
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
99
    control.create_repository()
 
100
    control.create_branch()
 
101
    tree = self.format.initialize(control)
 
102
    tree.lock_write()
 
103
    try:
 
104
        tree._write_inventory(basis)
 
105
    finally:
 
106
        tree.unlock()
 
107
    # Fresh object, reads disk again.
 
108
    tree = tree.bzrdir.open_workingtree()
 
109
    tree.lock_write()
 
110
    try:
 
111
        tree.apply_inventory_delta(delta)
 
112
    finally:
 
113
        tree.unlock()
 
114
    # reload tree - ensure we get what was written.
 
115
    tree = tree.bzrdir.open_workingtree()
 
116
    tree.lock_read()
 
117
    self.addCleanup(tree.unlock)
 
118
    # One could add 'tree._validate' here but that would cause 'early' failues 
 
119
    # as far as higher level code is concerned. Possibly adding an
 
120
    # expect_fail parameter to this function and if that is False then do a
 
121
    # validate call.
 
122
    return tree.inventory
 
123
 
 
124
 
 
125
def apply_inventory_WT_basis(self, basis, delta):
 
126
    """Apply delta to basis and return the result.
 
127
 
 
128
    This sets the parent and then calls update_basis_by_delta.
 
129
    It also puts the basis in the repository under both 'basis' and 'result' to
 
130
    allow safety checks made by the WT to succeed, and finally ensures that all
 
131
    items in the delta with a new path are present in the WT before calling
 
132
    update_basis_by_delta.
 
133
    
 
134
    :param basis: An inventory to be used as the basis.
 
135
    :param delta: The inventory delta to apply:
 
136
    :return: An inventory resulting from the application.
 
137
    """
 
138
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
139
    control.create_repository()
 
140
    control.create_branch()
 
141
    tree = self.format.initialize(control)
 
142
    tree.lock_write()
 
143
    try:
 
144
        repo = tree.branch.repository
 
145
        repo.start_write_group()
 
146
        try:
 
147
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
148
                message="", committer="foo@example.com")
 
149
            basis.revision_id = 'basis'
 
150
            create_texts_for_inv(tree.branch.repository, basis)
 
151
            repo.add_revision('basis', rev, basis)
 
152
            # Add a revision for the result, with the basis content - 
 
153
            # update_basis_by_delta doesn't check that the delta results in
 
154
            # result, and we want inconsistent deltas to get called on the
 
155
            # tree, or else the code isn't actually checked.
 
156
            rev = revision.Revision('result', timestamp=0, timezone=None,
 
157
                message="", committer="foo@example.com")
 
158
            basis.revision_id = 'result'
 
159
            repo.add_revision('result', rev, basis)
 
160
            repo.commit_write_group()
 
161
        except:
 
162
            repo.abort_write_group()
 
163
            raise
 
164
        # Set the basis state as the trees current state
 
165
        tree._write_inventory(basis)
 
166
        # This reads basis from the repo and puts it into the tree's local
 
167
        # cache, if it has one.
 
168
        tree.set_parent_ids(['basis'])
 
169
        paths = {}
 
170
        parents = set()
 
171
        for old, new, id, entry in delta:
 
172
            if None in (new, entry):
 
173
                continue
 
174
            paths[new] = (entry.file_id, entry.kind)
 
175
            parents.add(osutils.dirname(new))
 
176
        parents = osutils.minimum_path_selection(parents)
 
177
        parents.discard('')
 
178
        # Put place holders in the tree to permit adding the other entries.
 
179
        for pos, parent in enumerate(parents):
 
180
            if not tree.path2id(parent):
 
181
                # add a synthetic directory in the tree so we can can put the
 
182
                # tree0 entries in place for dirstate.
 
183
                tree.add([parent], ["id%d" % pos], ["directory"])
 
184
        if paths:
 
185
            # Many deltas may cause this mini-apply to fail, but we want to see what
 
186
            # the delta application code says, not the prep that we do to deal with 
 
187
            # limitations of dirstate's update_basis code.
 
188
            for path, (file_id, kind) in sorted(paths.items()):
 
189
                try:
 
190
                    tree.add([path], [file_id], [kind])
 
191
                except (KeyboardInterrupt, SystemExit):
 
192
                    raise
 
193
                except:
 
194
                    pass
 
195
    finally:
 
196
        tree.unlock()
 
197
    # Fresh lock, reads disk again.
 
198
    tree.lock_write()
 
199
    try:
 
200
        tree.update_basis_by_delta('result', delta)
 
201
    finally:
 
202
        tree.unlock()
 
203
    # reload tree - ensure we get what was written.
 
204
    tree = tree.bzrdir.open_workingtree()
 
205
    basis_tree = tree.basis_tree()
 
206
    basis_tree.lock_read()
 
207
    self.addCleanup(basis_tree.unlock)
 
208
    # Note, that if the tree does not have a local cache, the trick above of
 
209
    # setting the result as the basis, will come back to bite us. That said,
 
210
    # all the implementations in bzr do have a local cache.
 
211
    return basis_tree.inventory
 
212
 
 
213
 
 
214
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
 
215
    """Apply delta to basis and return the result.
 
216
    
 
217
    This inserts basis as a whole inventory and then uses
 
218
    add_inventory_by_delta to add delta.
 
219
 
 
220
    :param basis: An inventory to be used as the basis.
 
221
    :param delta: The inventory delta to apply:
 
222
    :return: An inventory resulting from the application.
 
223
    """
 
224
    format = self.format()
 
225
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
 
226
    repo = format.initialize(control)
 
227
    repo.lock_write()
 
228
    try:
 
229
        repo.start_write_group()
 
230
        try:
 
231
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
232
                message="", committer="foo@example.com")
 
233
            basis.revision_id = 'basis'
 
234
            create_texts_for_inv(repo, basis)
 
235
            repo.add_revision('basis', rev, basis)
 
236
            repo.commit_write_group()
 
237
        except:
 
238
            repo.abort_write_group()
 
239
            raise
 
240
    finally:
 
241
        repo.unlock()
 
242
    repo.lock_write()
 
243
    try:
 
244
        repo.start_write_group()
 
245
        try:
 
246
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
 
247
                'result', ['basis'])
 
248
        except:
 
249
            repo.abort_write_group()
 
250
            raise
 
251
        else:
 
252
            repo.commit_write_group()
 
253
    finally:
 
254
        repo.unlock()
 
255
    # Fresh lock, reads disk again.
 
256
    repo = repo.bzrdir.open_repository()
 
257
    repo.lock_read()
 
258
    self.addCleanup(repo.unlock)
 
259
    return repo.get_inventory('result')
 
260
 
 
261
 
 
262
class TestInventoryUpdates(TestCase):
 
263
 
 
264
    def test_creation_from_root_id(self):
 
265
        # iff a root id is passed to the constructor, a root directory is made
 
266
        inv = inventory.Inventory(root_id='tree-root')
 
267
        self.assertNotEqual(None, inv.root)
 
268
        self.assertEqual('tree-root', inv.root.file_id)
 
269
 
 
270
    def test_add_path_of_root(self):
 
271
        # if no root id is given at creation time, there is no root directory
 
272
        inv = inventory.Inventory(root_id=None)
 
273
        self.assertIs(None, inv.root)
 
274
        # add a root entry by adding its path
 
275
        ie = inv.add_path("", "directory", "my-root")
 
276
        ie.revision = 'test-rev'
 
277
        self.assertEqual("my-root", ie.file_id)
 
278
        self.assertIs(ie, inv.root)
 
279
 
 
280
    def test_add_path(self):
 
281
        inv = inventory.Inventory(root_id='tree_root')
 
282
        ie = inv.add_path('hello', 'file', 'hello-id')
 
283
        self.assertEqual('hello-id', ie.file_id)
 
284
        self.assertEqual('file', ie.kind)
 
285
 
 
286
    def test_copy(self):
 
287
        """Make sure copy() works and creates a deep copy."""
 
288
        inv = inventory.Inventory(root_id='some-tree-root')
 
289
        ie = inv.add_path('hello', 'file', 'hello-id')
 
290
        inv2 = inv.copy()
 
291
        inv.root.file_id = 'some-new-root'
 
292
        ie.name = 'file2'
 
293
        self.assertEqual('some-tree-root', inv2.root.file_id)
 
294
        self.assertEqual('hello', inv2['hello-id'].name)
 
295
 
 
296
    def test_copy_empty(self):
 
297
        """Make sure an empty inventory can be copied."""
 
298
        inv = inventory.Inventory(root_id=None)
 
299
        inv2 = inv.copy()
 
300
        self.assertIs(None, inv2.root)
 
301
 
 
302
    def test_copy_copies_root_revision(self):
 
303
        """Make sure the revision of the root gets copied."""
 
304
        inv = inventory.Inventory(root_id='someroot')
 
305
        inv.root.revision = 'therev'
 
306
        inv2 = inv.copy()
 
307
        self.assertEquals('someroot', inv2.root.file_id)
 
308
        self.assertEquals('therev', inv2.root.revision)
 
309
 
 
310
    def test_create_tree_reference(self):
 
311
        inv = inventory.Inventory('tree-root-123')
 
312
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
 
313
                              revision='rev', reference_revision='rev2'))
 
314
 
 
315
    def test_error_encoding(self):
 
316
        inv = inventory.Inventory('tree-root')
 
317
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
 
318
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
319
            InventoryFile('b-id', u'\u1234', 'tree-root'))
 
320
        self.assertContainsRe(str(e), r'\\u1234')
 
321
 
 
322
    def test_add_recursive(self):
 
323
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
 
324
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
 
325
        parent.children[child.file_id] = child
 
326
        inv = inventory.Inventory('tree-root')
 
327
        inv.add(parent)
 
328
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
 
329
 
 
330
 
 
331
 
 
332
class TestDeltaApplication(TestCaseWithTransport):
 
333
 
 
334
    def get_empty_inventory(self, reference_inv=None):
 
335
        """Get an empty inventory.
 
336
 
 
337
        Note that tests should not depend on the revision of the root for
 
338
        setting up test conditions, as it has to be flexible to accomodate non
 
339
        rich root repositories.
 
340
 
 
341
        :param reference_inv: If not None, get the revision for the root from
 
342
            this inventory. This is useful for dealing with older repositories
 
343
            that routinely discarded the root entry data. If None, the root's
 
344
            revision is set to 'basis'.
 
345
        """
 
346
        inv = inventory.Inventory()
 
347
        if reference_inv is not None:
 
348
            inv.root.revision = reference_inv.root.revision
 
349
        else:
 
350
            inv.root.revision = 'basis'
 
351
        return inv
 
352
 
 
353
    def test_empty_delta(self):
 
354
        inv = self.get_empty_inventory()
 
355
        delta = []
 
356
        inv = self.apply_delta(self, inv, delta)
 
357
        inv2 = self.get_empty_inventory(inv)
 
358
        self.assertEqual([], inv2._make_delta(inv))
 
359
 
 
360
    def test_None_file_id(self):
 
361
        inv = self.get_empty_inventory()
 
362
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
 
363
        dir1.revision = 'result'
 
364
        delta = [(None, u'dir1', None, dir1)]
 
365
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
366
            inv, delta)
 
367
 
 
368
    def test_unicode_file_id(self):
 
369
        inv = self.get_empty_inventory()
 
370
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
 
371
        dir1.revision = 'result'
 
372
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
373
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
374
            inv, delta)
 
375
 
 
376
    def test_repeated_file_id(self):
 
377
        inv = self.get_empty_inventory()
 
378
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
 
379
        file1.revision = 'result'
 
380
        file1.text_size = 0
 
381
        file1.text_sha1 = ""
 
382
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
 
383
        file2.revision = 'result'
 
384
        file2.text_size = 0
 
385
        file2.text_sha1 = ""
 
386
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
 
387
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
388
            inv, delta)
 
389
 
 
390
    def test_repeated_new_path(self):
 
391
        inv = self.get_empty_inventory()
 
392
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
393
        file1.revision = 'result'
 
394
        file1.text_size = 0
 
395
        file1.text_sha1 = ""
 
396
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
 
397
        file2.revision = 'result'
 
398
        file2.text_size = 0
 
399
        file2.text_sha1 = ""
 
400
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
 
401
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
402
            inv, delta)
 
403
 
 
404
    def test_repeated_old_path(self):
 
405
        inv = self.get_empty_inventory()
 
406
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
407
        file1.revision = 'result'
 
408
        file1.text_size = 0
 
409
        file1.text_sha1 = ""
 
410
        # We can't *create* a source inventory with the same path, but
 
411
        # a badly generated partial delta might claim the same source twice.
 
412
        # This would be buggy in two ways: the path is repeated in the delta,
 
413
        # And the path for one of the file ids doesn't match the source
 
414
        # location. Alternatively, we could have a repeated fileid, but that
 
415
        # is separately checked for.
 
416
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
 
417
        file2.revision = 'result'
 
418
        file2.text_size = 0
 
419
        file2.text_sha1 = ""
 
420
        inv.add(file1)
 
421
        inv.add(file2)
 
422
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
 
423
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
424
            inv, delta)
 
425
 
 
426
    def test_mismatched_id_entry_id(self):
 
427
        inv = self.get_empty_inventory()
 
428
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
429
        file1.revision = 'result'
 
430
        file1.text_size = 0
 
431
        file1.text_sha1 = ""
 
432
        delta = [(None, u'path', 'id', file1)]
 
433
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
434
            inv, delta)
 
435
 
 
436
    def test_mismatched_new_path_entry_None(self):
 
437
        inv = self.get_empty_inventory()
 
438
        delta = [(None, u'path', 'id', None)]
 
439
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
440
            inv, delta)
 
441
 
 
442
    def test_mismatched_new_path_None_entry(self):
 
443
        inv = self.get_empty_inventory()
 
444
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
445
        file1.revision = 'result'
 
446
        file1.text_size = 0
 
447
        file1.text_sha1 = ""
 
448
        delta = [(u"path", None, 'id1', file1)]
 
449
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
450
            inv, delta)
 
451
 
 
452
    def test_parent_is_not_directory(self):
 
453
        inv = self.get_empty_inventory()
 
454
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
455
        file1.revision = 'result'
 
456
        file1.text_size = 0
 
457
        file1.text_sha1 = ""
 
458
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
 
459
        file2.revision = 'result'
 
460
        file2.text_size = 0
 
461
        file2.text_sha1 = ""
 
462
        inv.add(file1)
 
463
        delta = [(None, u'path/path2', 'id2', file2)]
 
464
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
465
            inv, delta)
 
466
 
 
467
    def test_parent_is_missing(self):
 
468
        inv = self.get_empty_inventory()
 
469
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
 
470
        file2.revision = 'result'
 
471
        file2.text_size = 0
 
472
        file2.text_sha1 = ""
 
473
        delta = [(None, u'path/path2', 'id2', file2)]
 
474
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
475
            inv, delta)
 
476
 
 
477
    def test_new_parent_path_has_wrong_id(self):
 
478
        inv = self.get_empty_inventory()
 
479
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
480
        parent1.revision = 'result'
 
481
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
482
        parent2.revision = 'result'
 
483
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
484
        file1.revision = 'result'
 
485
        file1.text_size = 0
 
486
        file1.text_sha1 = ""
 
487
        inv.add(parent1)
 
488
        inv.add(parent2)
 
489
        # This delta claims that file1 is at dir/path, but actually its at
 
490
        # dir2/path if you follow the inventory parent structure.
 
491
        delta = [(None, u'dir/path', 'id', file1)]
 
492
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
493
            inv, delta)
 
494
 
 
495
    def test_old_parent_path_is_wrong(self):
 
496
        inv = self.get_empty_inventory()
 
497
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
498
        parent1.revision = 'result'
 
499
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
500
        parent2.revision = 'result'
 
501
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
502
        file1.revision = 'result'
 
503
        file1.text_size = 0
 
504
        file1.text_sha1 = ""
 
505
        inv.add(parent1)
 
506
        inv.add(parent2)
 
507
        inv.add(file1)
 
508
        # This delta claims that file1 was at dir/path, but actually it was at
 
509
        # dir2/path if you follow the inventory parent structure.
 
510
        delta = [(u'dir/path', None, 'id', None)]
 
511
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
512
            inv, delta)
 
513
 
 
514
    def test_old_parent_path_is_for_other_id(self):
 
515
        inv = self.get_empty_inventory()
 
516
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
517
        parent1.revision = 'result'
 
518
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
519
        parent2.revision = 'result'
 
520
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
521
        file1.revision = 'result'
 
522
        file1.text_size = 0
 
523
        file1.text_sha1 = ""
 
524
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
 
525
        file2.revision = 'result'
 
526
        file2.text_size = 0
 
527
        file2.text_sha1 = ""
 
528
        inv.add(parent1)
 
529
        inv.add(parent2)
 
530
        inv.add(file1)
 
531
        inv.add(file2)
 
532
        # This delta claims that file1 was at dir/path, but actually it was at
 
533
        # dir2/path if you follow the inventory parent structure. At dir/path
 
534
        # is another entry we should not delete.
 
535
        delta = [(u'dir/path', None, 'id', None)]
 
536
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
537
            inv, delta)
 
538
 
 
539
    def test_add_existing_id_new_path(self):
 
540
        inv = self.get_empty_inventory()
 
541
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
542
        parent1.revision = 'result'
 
543
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
 
544
        parent2.revision = 'result'
 
545
        inv.add(parent1)
 
546
        delta = [(None, u'dir2', 'p-1', parent2)]
 
547
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
548
            inv, delta)
 
549
 
 
550
    def test_add_new_id_existing_path(self):
 
551
        inv = self.get_empty_inventory()
 
552
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
553
        parent1.revision = 'result'
 
554
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
 
555
        parent2.revision = 'result'
 
556
        inv.add(parent1)
 
557
        delta = [(None, u'dir1', 'p-2', parent2)]
 
558
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
559
            inv, delta)
 
560
 
 
561
    def test_remove_dir_leaving_dangling_child(self):
 
562
        inv = self.get_empty_inventory()
 
563
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
564
        dir1.revision = 'result'
 
565
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
 
566
        dir2.revision = 'result'
 
567
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
 
568
        dir3.revision = 'result'
 
569
        inv.add(dir1)
 
570
        inv.add(dir2)
 
571
        inv.add(dir3)
 
572
        delta = [(u'dir1', None, 'p-1', None),
 
573
            (u'dir1/child2', None, 'p-3', None)]
 
574
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
575
            inv, delta)
 
576
 
 
577
 
 
578
class TestInventory(TestCase):
 
579
 
 
580
    def test_is_root(self):
 
581
        """Ensure our root-checking code is accurate."""
 
582
        inv = inventory.Inventory('TREE_ROOT')
 
583
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
584
        self.assertFalse(inv.is_root('booga'))
 
585
        inv.root.file_id = 'booga'
 
586
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
587
        self.assertTrue(inv.is_root('booga'))
 
588
        # works properly even if no root is set
 
589
        inv.root = None
 
590
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
591
        self.assertFalse(inv.is_root('booga'))
 
592
 
 
593
 
 
594
class TestInventoryEntry(TestCase):
 
595
 
 
596
    def test_file_kind_character(self):
 
597
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
598
        self.assertEqual(file.kind_character(), '')
 
599
 
 
600
    def test_dir_kind_character(self):
 
601
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
602
        self.assertEqual(dir.kind_character(), '/')
 
603
 
 
604
    def test_link_kind_character(self):
 
605
        dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
606
        self.assertEqual(dir.kind_character(), '')
 
607
 
 
608
    def test_dir_detect_changes(self):
 
609
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
610
        left.text_sha1 = 123
 
611
        left.executable = True
 
612
        left.symlink_target='foo'
 
613
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
614
        right.text_sha1 = 321
 
615
        right.symlink_target='bar'
 
616
        self.assertEqual((False, False), left.detect_changes(right))
 
617
        self.assertEqual((False, False), right.detect_changes(left))
 
618
 
 
619
    def test_file_detect_changes(self):
 
620
        left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
621
        left.text_sha1 = 123
 
622
        right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
623
        right.text_sha1 = 123
 
624
        self.assertEqual((False, False), left.detect_changes(right))
 
625
        self.assertEqual((False, False), right.detect_changes(left))
 
626
        left.executable = True
 
627
        self.assertEqual((False, True), left.detect_changes(right))
 
628
        self.assertEqual((False, True), right.detect_changes(left))
 
629
        right.text_sha1 = 321
 
630
        self.assertEqual((True, True), left.detect_changes(right))
 
631
        self.assertEqual((True, True), right.detect_changes(left))
 
632
 
 
633
    def test_symlink_detect_changes(self):
 
634
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
635
        left.text_sha1 = 123
 
636
        left.executable = True
 
637
        left.symlink_target='foo'
 
638
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
639
        right.text_sha1 = 321
 
640
        right.symlink_target='foo'
 
641
        self.assertEqual((False, False), left.detect_changes(right))
 
642
        self.assertEqual((False, False), right.detect_changes(left))
 
643
        left.symlink_target = 'different'
 
644
        self.assertEqual((True, False), left.detect_changes(right))
 
645
        self.assertEqual((True, False), right.detect_changes(left))
 
646
 
 
647
    def test_file_has_text(self):
 
648
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
 
649
        self.failUnless(file.has_text())
 
650
 
 
651
    def test_directory_has_text(self):
 
652
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
653
        self.failIf(dir.has_text())
 
654
 
 
655
    def test_link_has_text(self):
 
656
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
657
        self.failIf(link.has_text())
 
658
 
 
659
    def test_make_entry(self):
 
660
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
661
            inventory.InventoryFile)
 
662
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
663
            inventory.InventoryLink)
 
664
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
665
            inventory.InventoryDirectory)
 
666
 
 
667
    def test_make_entry_non_normalized(self):
 
668
        orig_normalized_filename = osutils.normalized_filename
 
669
 
 
670
        try:
 
671
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
672
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
673
            self.assertEqual(u'\xe5', entry.name)
 
674
            self.assertIsInstance(entry, inventory.InventoryFile)
 
675
 
 
676
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
677
            self.assertRaises(errors.InvalidNormalization,
 
678
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
679
        finally:
 
680
            osutils.normalized_filename = orig_normalized_filename
 
681
 
 
682
 
 
683
class TestDescribeChanges(TestCase):
 
684
 
 
685
    def test_describe_change(self):
 
686
        # we need to test the following change combinations:
 
687
        # rename
 
688
        # reparent
 
689
        # modify
 
690
        # gone
 
691
        # added
 
692
        # renamed/reparented and modified
 
693
        # change kind (perhaps can't be done yet?)
 
694
        # also, merged in combination with all of these?
 
695
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
696
        old_a.text_sha1 = '123132'
 
697
        old_a.text_size = 0
 
698
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
699
        new_a.text_sha1 = '123132'
 
700
        new_a.text_size = 0
 
701
 
 
702
        self.assertChangeDescription('unchanged', old_a, new_a)
 
703
 
 
704
        new_a.text_size = 10
 
705
        new_a.text_sha1 = 'abcabc'
 
706
        self.assertChangeDescription('modified', old_a, new_a)
 
707
 
 
708
        self.assertChangeDescription('added', None, new_a)
 
709
        self.assertChangeDescription('removed', old_a, None)
 
710
        # perhaps a bit questionable but seems like the most reasonable thing...
 
711
        self.assertChangeDescription('unchanged', None, None)
 
712
 
 
713
        # in this case it's both renamed and modified; show a rename and
 
714
        # modification:
 
715
        new_a.name = 'newfilename'
 
716
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
717
 
 
718
        # reparenting is 'renaming'
 
719
        new_a.name = old_a.name
 
720
        new_a.parent_id = 'somedir-id'
 
721
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
722
 
 
723
        # reset the content values so its not modified
 
724
        new_a.text_size = old_a.text_size
 
725
        new_a.text_sha1 = old_a.text_sha1
 
726
        new_a.name = old_a.name
 
727
 
 
728
        new_a.name = 'newfilename'
 
729
        self.assertChangeDescription('renamed', old_a, new_a)
 
730
 
 
731
        # reparenting is 'renaming'
 
732
        new_a.name = old_a.name
 
733
        new_a.parent_id = 'somedir-id'
 
734
        self.assertChangeDescription('renamed', old_a, new_a)
 
735
 
 
736
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
737
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
738
        self.assertEqual(expected_change, change)
 
739
 
 
740
 
 
741
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
742
 
 
743
    def get_chk_bytes(self):
 
744
        factory = groupcompress.make_pack_factory(True, True, 1)
 
745
        trans = self.get_transport('')
 
746
        return factory(trans)
 
747
 
 
748
    def read_bytes(self, chk_bytes, key):
 
749
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
750
        return stream.next().get_bytes_as("fulltext")
 
751
 
 
752
    def test_deserialise_gives_CHKInventory(self):
 
753
        inv = Inventory()
 
754
        inv.revision_id = "revid"
 
755
        inv.root.revision = "rootrev"
 
756
        chk_bytes = self.get_chk_bytes()
 
757
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
758
        bytes = ''.join(chk_inv.to_lines())
 
759
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
760
        self.assertEqual("revid", new_inv.revision_id)
 
761
        self.assertEqual("directory", new_inv.root.kind)
 
762
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
763
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
764
        self.assertEqual(inv.root.name, new_inv.root.name)
 
765
        self.assertEqual("rootrev", new_inv.root.revision)
 
766
        self.assertEqual('plain', new_inv._search_key_name)
 
767
 
 
768
    def test_deserialise_wrong_revid(self):
 
769
        inv = Inventory()
 
770
        inv.revision_id = "revid"
 
771
        inv.root.revision = "rootrev"
 
772
        chk_bytes = self.get_chk_bytes()
 
773
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
774
        bytes = ''.join(chk_inv.to_lines())
 
775
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
776
            bytes, ("revid2",))
 
777
 
 
778
    def test_captures_rev_root_byid(self):
 
779
        inv = Inventory()
 
780
        inv.revision_id = "foo"
 
781
        inv.root.revision = "bar"
 
782
        chk_bytes = self.get_chk_bytes()
 
783
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
784
        lines = chk_inv.to_lines()
 
785
        self.assertEqual([
 
786
            'chkinventory:\n',
 
787
            'revision_id: foo\n',
 
788
            'root_id: TREE_ROOT\n',
 
789
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
790
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
791
            ], lines)
 
792
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
793
        self.assertEqual('plain', chk_inv._search_key_name)
 
794
 
 
795
    def test_captures_parent_id_basename_index(self):
 
796
        inv = Inventory()
 
797
        inv.revision_id = "foo"
 
798
        inv.root.revision = "bar"
 
799
        chk_bytes = self.get_chk_bytes()
 
800
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
801
        lines = chk_inv.to_lines()
 
802
        self.assertEqual([
 
803
            'chkinventory:\n',
 
804
            'revision_id: foo\n',
 
805
            'root_id: TREE_ROOT\n',
 
806
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
807
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
808
            ], lines)
 
809
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
810
        self.assertEqual('plain', chk_inv._search_key_name)
 
811
 
 
812
    def test_captures_search_key_name(self):
 
813
        inv = Inventory()
 
814
        inv.revision_id = "foo"
 
815
        inv.root.revision = "bar"
 
816
        chk_bytes = self.get_chk_bytes()
 
817
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
818
                                              search_key_name='hash-16-way')
 
819
        lines = chk_inv.to_lines()
 
820
        self.assertEqual([
 
821
            'chkinventory:\n',
 
822
            'search_key_name: hash-16-way\n',
 
823
            'root_id: TREE_ROOT\n',
 
824
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
825
            'revision_id: foo\n',
 
826
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
827
            ], lines)
 
828
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
829
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
 
830
 
 
831
    def test_directory_children_on_demand(self):
 
832
        inv = Inventory()
 
833
        inv.revision_id = "revid"
 
834
        inv.root.revision = "rootrev"
 
835
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
836
        inv["fileid"].revision = "filerev"
 
837
        inv["fileid"].executable = True
 
838
        inv["fileid"].text_sha1 = "ffff"
 
839
        inv["fileid"].text_size = 1
 
840
        chk_bytes = self.get_chk_bytes()
 
841
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
842
        bytes = ''.join(chk_inv.to_lines())
 
843
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
844
        root_entry = new_inv[inv.root.file_id]
 
845
        self.assertEqual(None, root_entry._children)
 
846
        self.assertEqual(['file'], root_entry.children.keys())
 
847
        file_direct = new_inv["fileid"]
 
848
        file_found = root_entry.children['file']
 
849
        self.assertEqual(file_direct.kind, file_found.kind)
 
850
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
851
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
852
        self.assertEqual(file_direct.name, file_found.name)
 
853
        self.assertEqual(file_direct.revision, file_found.revision)
 
854
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
855
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
856
        self.assertEqual(file_direct.executable, file_found.executable)
 
857
 
 
858
    def test_from_inventory_maximum_size(self):
 
859
        # from_inventory supports the maximum_size parameter.
 
860
        inv = Inventory()
 
861
        inv.revision_id = "revid"
 
862
        inv.root.revision = "rootrev"
 
863
        chk_bytes = self.get_chk_bytes()
 
864
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
865
        chk_inv.id_to_entry._ensure_root()
 
866
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
867
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
868
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
869
        p_id_basename._ensure_root()
 
870
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
871
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
872
 
 
873
    def test___iter__(self):
 
874
        inv = Inventory()
 
875
        inv.revision_id = "revid"
 
876
        inv.root.revision = "rootrev"
 
877
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
878
        inv["fileid"].revision = "filerev"
 
879
        inv["fileid"].executable = True
 
880
        inv["fileid"].text_sha1 = "ffff"
 
881
        inv["fileid"].text_size = 1
 
882
        chk_bytes = self.get_chk_bytes()
 
883
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
884
        bytes = ''.join(chk_inv.to_lines())
 
885
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
886
        fileids = list(new_inv.__iter__())
 
887
        fileids.sort()
 
888
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
 
889
 
 
890
    def test__len__(self):
 
891
        inv = Inventory()
 
892
        inv.revision_id = "revid"
 
893
        inv.root.revision = "rootrev"
 
894
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
895
        inv["fileid"].revision = "filerev"
 
896
        inv["fileid"].executable = True
 
897
        inv["fileid"].text_sha1 = "ffff"
 
898
        inv["fileid"].text_size = 1
 
899
        chk_bytes = self.get_chk_bytes()
 
900
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
901
        self.assertEqual(2, len(chk_inv))
 
902
 
 
903
    def test___getitem__(self):
 
904
        inv = Inventory()
 
905
        inv.revision_id = "revid"
 
906
        inv.root.revision = "rootrev"
 
907
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
908
        inv["fileid"].revision = "filerev"
 
909
        inv["fileid"].executable = True
 
910
        inv["fileid"].text_sha1 = "ffff"
 
911
        inv["fileid"].text_size = 1
 
912
        chk_bytes = self.get_chk_bytes()
 
913
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
914
        bytes = ''.join(chk_inv.to_lines())
 
915
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
916
        root_entry = new_inv[inv.root.file_id]
 
917
        file_entry = new_inv["fileid"]
 
918
        self.assertEqual("directory", root_entry.kind)
 
919
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
920
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
921
        self.assertEqual(inv.root.name, root_entry.name)
 
922
        self.assertEqual("rootrev", root_entry.revision)
 
923
        self.assertEqual("file", file_entry.kind)
 
924
        self.assertEqual("fileid", file_entry.file_id)
 
925
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
926
        self.assertEqual("file", file_entry.name)
 
927
        self.assertEqual("filerev", file_entry.revision)
 
928
        self.assertEqual("ffff", file_entry.text_sha1)
 
929
        self.assertEqual(1, file_entry.text_size)
 
930
        self.assertEqual(True, file_entry.executable)
 
931
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
 
932
 
 
933
    def test_has_id_true(self):
 
934
        inv = Inventory()
 
935
        inv.revision_id = "revid"
 
936
        inv.root.revision = "rootrev"
 
937
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
938
        inv["fileid"].revision = "filerev"
 
939
        inv["fileid"].executable = True
 
940
        inv["fileid"].text_sha1 = "ffff"
 
941
        inv["fileid"].text_size = 1
 
942
        chk_bytes = self.get_chk_bytes()
 
943
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
944
        self.assertTrue(chk_inv.has_id('fileid'))
 
945
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
946
 
 
947
    def test_has_id_not(self):
 
948
        inv = Inventory()
 
949
        inv.revision_id = "revid"
 
950
        inv.root.revision = "rootrev"
 
951
        chk_bytes = self.get_chk_bytes()
 
952
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
953
        self.assertFalse(chk_inv.has_id('fileid'))
 
954
 
 
955
    def test_id2path(self):
 
956
        inv = Inventory()
 
957
        inv.revision_id = "revid"
 
958
        inv.root.revision = "rootrev"
 
959
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
960
        fileentry = InventoryFile("fileid", "file", "dirid")
 
961
        inv.add(direntry)
 
962
        inv.add(fileentry)
 
963
        inv["fileid"].revision = "filerev"
 
964
        inv["fileid"].executable = True
 
965
        inv["fileid"].text_sha1 = "ffff"
 
966
        inv["fileid"].text_size = 1
 
967
        inv["dirid"].revision = "filerev"
 
968
        chk_bytes = self.get_chk_bytes()
 
969
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
970
        bytes = ''.join(chk_inv.to_lines())
 
971
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
972
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
973
        self.assertEqual('dir', new_inv.id2path('dirid'))
 
974
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
 
975
 
 
976
    def test_path2id(self):
 
977
        inv = Inventory()
 
978
        inv.revision_id = "revid"
 
979
        inv.root.revision = "rootrev"
 
980
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
981
        fileentry = InventoryFile("fileid", "file", "dirid")
 
982
        inv.add(direntry)
 
983
        inv.add(fileentry)
 
984
        inv["fileid"].revision = "filerev"
 
985
        inv["fileid"].executable = True
 
986
        inv["fileid"].text_sha1 = "ffff"
 
987
        inv["fileid"].text_size = 1
 
988
        inv["dirid"].revision = "filerev"
 
989
        chk_bytes = self.get_chk_bytes()
 
990
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
991
        bytes = ''.join(chk_inv.to_lines())
 
992
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
993
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
994
        self.assertEqual('dirid', new_inv.path2id('dir'))
 
995
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
 
996
 
 
997
    def test_create_by_apply_delta_sets_root(self):
 
998
        inv = Inventory()
 
999
        inv.revision_id = "revid"
 
1000
        chk_bytes = self.get_chk_bytes()
 
1001
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1002
        inv.add_path("", "directory", "myrootid", None)
 
1003
        inv.revision_id = "expectedid"
 
1004
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1005
        delta = [("", None, base_inv.root.file_id, None),
 
1006
            (None, "",  "myrootid", inv.root)]
 
1007
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1008
        self.assertEquals(reference_inv.root, new_inv.root)
 
1009
 
 
1010
    def test_create_by_apply_delta_empty_add_child(self):
 
1011
        inv = Inventory()
 
1012
        inv.revision_id = "revid"
 
1013
        inv.root.revision = "rootrev"
 
1014
        chk_bytes = self.get_chk_bytes()
 
1015
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1016
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1017
        a_entry.revision = "filerev"
 
1018
        a_entry.executable = True
 
1019
        a_entry.text_sha1 = "ffff"
 
1020
        a_entry.text_size = 1
 
1021
        inv.add(a_entry)
 
1022
        inv.revision_id = "expectedid"
 
1023
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1024
        delta = [(None, "A",  "A-id", a_entry)]
 
1025
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1026
        # new_inv should be the same as reference_inv.
 
1027
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1028
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1029
        reference_inv.id_to_entry._ensure_root()
 
1030
        new_inv.id_to_entry._ensure_root()
 
1031
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1032
            new_inv.id_to_entry._root_node._key)
 
1033
 
 
1034
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1035
        inv = Inventory()
 
1036
        inv.revision_id = "revid"
 
1037
        inv.root.revision = "rootrev"
 
1038
        chk_bytes = self.get_chk_bytes()
 
1039
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1040
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1041
        a_entry.revision = "filerev"
 
1042
        a_entry.executable = True
 
1043
        a_entry.text_sha1 = "ffff"
 
1044
        a_entry.text_size = 1
 
1045
        inv.add(a_entry)
 
1046
        inv.revision_id = "expectedid"
 
1047
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1048
        delta = [(None, "A",  "A-id", a_entry)]
 
1049
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1050
        reference_inv.id_to_entry._ensure_root()
 
1051
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1052
        new_inv.id_to_entry._ensure_root()
 
1053
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1054
        # new_inv should be the same as reference_inv.
 
1055
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1056
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1057
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1058
            new_inv.id_to_entry._root_node._key)
 
1059
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1060
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
1061
 
 
1062
    def test_iter_changes(self):
 
1063
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1064
        # InterTree are coming.
 
1065
        inv = Inventory()
 
1066
        inv.revision_id = "revid"
 
1067
        inv.root.revision = "rootrev"
 
1068
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1069
        inv["fileid"].revision = "filerev"
 
1070
        inv["fileid"].executable = True
 
1071
        inv["fileid"].text_sha1 = "ffff"
 
1072
        inv["fileid"].text_size = 1
 
1073
        inv2 = Inventory()
 
1074
        inv2.revision_id = "revid2"
 
1075
        inv2.root.revision = "rootrev"
 
1076
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1077
        inv2["fileid"].revision = "filerev2"
 
1078
        inv2["fileid"].executable = False
 
1079
        inv2["fileid"].text_sha1 = "bbbb"
 
1080
        inv2["fileid"].text_size = 2
 
1081
        # get fresh objects.
 
1082
        chk_bytes = self.get_chk_bytes()
 
1083
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1084
        bytes = ''.join(chk_inv.to_lines())
 
1085
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1086
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1087
        bytes = ''.join(chk_inv2.to_lines())
 
1088
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
 
1089
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
 
1090
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
1091
            (False, True))],
 
1092
            list(inv_1.iter_changes(inv_2)))
 
1093
 
 
1094
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1095
        inv = Inventory()
 
1096
        inv.revision_id = "revid"
 
1097
        inv.root.revision = "rootrev"
 
1098
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1099
        inv["fileid"].revision = "filerev"
 
1100
        inv["fileid"].executable = True
 
1101
        inv["fileid"].text_sha1 = "ffff"
 
1102
        inv["fileid"].text_size = 1
 
1103
        # get fresh objects.
 
1104
        chk_bytes = self.get_chk_bytes()
 
1105
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1106
        bytes = ''.join(tmp_inv.to_lines())
 
1107
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1108
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1109
        self.assertEqual(
 
1110
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
 
1111
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1112
 
 
1113
    def test_file_entry_to_bytes(self):
 
1114
        inv = CHKInventory(None)
 
1115
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
 
1116
        ie.executable = True
 
1117
        ie.revision = 'file-rev-id'
 
1118
        ie.text_sha1 = 'abcdefgh'
 
1119
        ie.text_size = 100
 
1120
        bytes = inv._entry_to_bytes(ie)
 
1121
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
 
1122
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1123
        ie2 = inv._bytes_to_entry(bytes)
 
1124
        self.assertEqual(ie, ie2)
 
1125
        self.assertIsInstance(ie2.name, unicode)
 
1126
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
 
1127
                         inv._bytes_to_utf8name_key(bytes))
 
1128
 
 
1129
    def test_file2_entry_to_bytes(self):
 
1130
        inv = CHKInventory(None)
 
1131
        # \u30a9 == 'omega'
 
1132
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
 
1133
        ie.executable = False
 
1134
        ie.revision = 'file-rev-id'
 
1135
        ie.text_sha1 = '123456'
 
1136
        ie.text_size = 25
 
1137
        bytes = inv._entry_to_bytes(ie)
 
1138
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
 
1139
                         'file-rev-id\n123456\n25\nN', bytes)
 
1140
        ie2 = inv._bytes_to_entry(bytes)
 
1141
        self.assertEqual(ie, ie2)
 
1142
        self.assertIsInstance(ie2.name, unicode)
 
1143
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
 
1144
                         inv._bytes_to_utf8name_key(bytes))
 
1145
 
 
1146
    def test_dir_entry_to_bytes(self):
 
1147
        inv = CHKInventory(None)
 
1148
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
 
1149
        ie.revision = 'dir-rev-id'
 
1150
        bytes = inv._entry_to_bytes(ie)
 
1151
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1152
        ie2 = inv._bytes_to_entry(bytes)
 
1153
        self.assertEqual(ie, ie2)
 
1154
        self.assertIsInstance(ie2.name, unicode)
 
1155
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
 
1156
                         inv._bytes_to_utf8name_key(bytes))
 
1157
 
 
1158
    def test_dir2_entry_to_bytes(self):
 
1159
        inv = CHKInventory(None)
 
1160
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
 
1161
                                          None)
 
1162
        ie.revision = 'dir-rev-id'
 
1163
        bytes = inv._entry_to_bytes(ie)
 
1164
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
 
1165
                         'dir-rev-id', bytes)
 
1166
        ie2 = inv._bytes_to_entry(bytes)
 
1167
        self.assertEqual(ie, ie2)
 
1168
        self.assertIsInstance(ie2.name, unicode)
 
1169
        self.assertIs(ie2.parent_id, None)
 
1170
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
 
1171
                         inv._bytes_to_utf8name_key(bytes))
 
1172
 
 
1173
    def test_symlink_entry_to_bytes(self):
 
1174
        inv = CHKInventory(None)
 
1175
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
 
1176
        ie.revision = 'link-rev-id'
 
1177
        ie.symlink_target = u'target/path'
 
1178
        bytes = inv._entry_to_bytes(ie)
 
1179
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
 
1180
                         'link-rev-id\ntarget/path', bytes)
 
1181
        ie2 = inv._bytes_to_entry(bytes)
 
1182
        self.assertEqual(ie, ie2)
 
1183
        self.assertIsInstance(ie2.name, unicode)
 
1184
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1185
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
 
1186
                         inv._bytes_to_utf8name_key(bytes))
 
1187
 
 
1188
    def test_symlink2_entry_to_bytes(self):
 
1189
        inv = CHKInventory(None)
 
1190
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
 
1191
        ie.revision = 'link-rev-id'
 
1192
        ie.symlink_target = u'target/\u03a9path'
 
1193
        bytes = inv._entry_to_bytes(ie)
 
1194
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1195
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1196
        ie2 = inv._bytes_to_entry(bytes)
 
1197
        self.assertEqual(ie, ie2)
 
1198
        self.assertIsInstance(ie2.name, unicode)
 
1199
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1200
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
 
1201
                         inv._bytes_to_utf8name_key(bytes))
 
1202
 
 
1203
    def test_tree_reference_entry_to_bytes(self):
 
1204
        inv = CHKInventory(None)
 
1205
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
 
1206
                                     'parent-id')
 
1207
        ie.revision = 'tree-rev-id'
 
1208
        ie.reference_revision = 'ref-rev-id'
 
1209
        bytes = inv._entry_to_bytes(ie)
 
1210
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1211
                         'tree-rev-id\nref-rev-id', bytes)
 
1212
        ie2 = inv._bytes_to_entry(bytes)
 
1213
        self.assertEqual(ie, ie2)
 
1214
        self.assertIsInstance(ie2.name, unicode)
 
1215
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
 
1216
                         inv._bytes_to_utf8name_key(bytes))
 
1217
 
 
1218
 
 
1219
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1220
 
 
1221
    def get_chk_bytes(self):
 
1222
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1223
        trans = self.get_transport('')
 
1224
        return factory(trans)
 
1225
 
 
1226
    def make_dir(self, inv, name, parent_id):
 
1227
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
 
1228
 
 
1229
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1230
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1231
        ie.text_sha1 = osutils.sha_string(content)
 
1232
        ie.text_size = len(content)
 
1233
        inv.add(ie)
 
1234
 
 
1235
    def make_simple_inventory(self):
 
1236
        inv = Inventory('TREE_ROOT')
 
1237
        inv.revision_id = "revid"
 
1238
        inv.root.revision = "rootrev"
 
1239
        # /                 TREE_ROOT
 
1240
        # dir1/             dir1-id
 
1241
        #   sub-file1       sub-file1-id
 
1242
        #   sub-file2       sub-file2-id
 
1243
        #   sub-dir1/       sub-dir1-id
 
1244
        #     subsub-file1  subsub-file1-id
 
1245
        # dir2/             dir2-id
 
1246
        #   sub2-file1      sub2-file1-id
 
1247
        # top               top-id
 
1248
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1249
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1250
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1251
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1252
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1253
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1254
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1255
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1256
        chk_bytes = self.get_chk_bytes()
 
1257
        #  use a small maximum_size to force internal paging structures
 
1258
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1259
                        maximum_size=100,
 
1260
                        search_key_name='hash-255-way')
 
1261
        bytes = ''.join(chk_inv.to_lines())
 
1262
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1263
 
 
1264
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1265
        self.assertEqual(sorted(expected_fileids),
 
1266
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1267
 
 
1268
    def assertExpand(self, all_ids, inv, file_ids):
 
1269
        (val_all_ids,
 
1270
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1271
        self.assertEqual(set(all_ids), val_all_ids)
 
1272
        entries = inv._getitems(val_all_ids)
 
1273
        expected_children = {}
 
1274
        for entry in entries:
 
1275
            s = expected_children.setdefault(entry.parent_id, [])
 
1276
            s.append(entry.file_id)
 
1277
        val_children = dict((k, sorted(v)) for k, v
 
1278
                            in val_children.iteritems())
 
1279
        expected_children = dict((k, sorted(v)) for k, v
 
1280
                            in expected_children.iteritems())
 
1281
        self.assertEqual(expected_children, val_children)
 
1282
 
 
1283
    def test_make_simple_inventory(self):
 
1284
        inv = self.make_simple_inventory()
 
1285
        layout = []
 
1286
        for path, entry in inv.iter_entries_by_dir():
 
1287
            layout.append((path, entry.file_id))
 
1288
        self.assertEqual([
 
1289
            ('', 'TREE_ROOT'),
 
1290
            ('dir1', 'dir1-id'),
 
1291
            ('dir2', 'dir2-id'),
 
1292
            ('top', 'top-id'),
 
1293
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1294
            ('dir1/sub-file1', 'sub-file1-id'),
 
1295
            ('dir1/sub-file2', 'sub-file2-id'),
 
1296
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1297
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1298
            ], layout)
 
1299
 
 
1300
    def test__getitems(self):
 
1301
        inv = self.make_simple_inventory()
 
1302
        # Reading from disk
 
1303
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1304
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1305
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1306
        # From cache
 
1307
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1308
        # Mixed
 
1309
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1310
                             ['dir1-id', 'sub-file2-id'])
 
1311
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1312
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1313
 
 
1314
    def test_single_file(self):
 
1315
        inv = self.make_simple_inventory()
 
1316
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1317
 
 
1318
    def test_get_all_parents(self):
 
1319
        inv = self.make_simple_inventory()
 
1320
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1321
                           'subsub-file1-id',
 
1322
                          ], inv, ['subsub-file1-id'])
 
1323
 
 
1324
    def test_get_children(self):
 
1325
        inv = self.make_simple_inventory()
 
1326
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1327
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1328
                          ], inv, ['dir1-id'])
 
1329
 
 
1330
    def test_from_root(self):
 
1331
        inv = self.make_simple_inventory()
 
1332
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1333
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1334
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1335
 
 
1336
    def test_top_level_file(self):
 
1337
        inv = self.make_simple_inventory()
 
1338
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1339
 
 
1340
    def test_subsub_file(self):
 
1341
        inv = self.make_simple_inventory()
 
1342
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1343
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1344
 
 
1345
    def test_sub_and_root(self):
 
1346
        inv = self.make_simple_inventory()
 
1347
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1348
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])