~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/selftest/testinv.py

  • Committer: Martin Pool
  • Date: 2005-07-29 12:29:27 UTC
  • Revision ID: mbp@sourcefrog.net-20050729122927-d51c2cedc14dd5d5
doc

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from bzrlib import (
19
 
    chk_map,
20
 
    groupcompress,
21
 
    errors,
22
 
    inventory,
23
 
    osutils,
24
 
    repository,
25
 
    revision,
26
 
    tests,
27
 
    workingtree,
28
 
    )
29
 
from bzrlib.inventory import (
30
 
    CHKInventory,
31
 
    Inventory,
32
 
    ROOT_ID,
33
 
    InventoryFile,
34
 
    InventoryDirectory,
35
 
    InventoryEntry,
36
 
    TreeReference,
37
 
    )
38
 
from bzrlib.tests import (
39
 
    TestCase,
40
 
    TestCaseWithTransport,
41
 
    )
42
 
from bzrlib.tests.scenarios import load_tests_apply_scenarios
43
 
 
44
 
 
45
 
load_tests = load_tests_apply_scenarios
46
 
 
47
 
 
48
 
def delta_application_scenarios():
49
 
    scenarios = [
50
 
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
51
 
        ]
52
 
    # Working tree basis delta application
53
 
    # Repository add_inv_by_delta.
54
 
    # Reduce form of the per_repository test logic - that logic needs to be
55
 
    # be able to get /just/ repositories whereas these tests are fine with
56
 
    # just creating trees.
57
 
    formats = set()
58
 
    for _, format in repository.format_registry.iteritems():
59
 
        scenarios.append((str(format.__name__), {
60
 
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
61
 
            'format':format}))
62
 
    for format in workingtree.format_registry._get_all():
63
 
        scenarios.append(
64
 
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
65
 
            'apply_delta':apply_inventory_WT_basis,
66
 
            'format':format}))
67
 
        scenarios.append(
68
 
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
69
 
            'apply_delta':apply_inventory_WT,
70
 
            'format':format}))
71
 
    return scenarios
72
 
 
73
 
 
74
 
def create_texts_for_inv(repo, inv):
75
 
    for path, ie in inv.iter_entries():
76
 
        if ie.text_size:
77
 
            lines = ['a' * ie.text_size]
78
 
        else:
79
 
            lines = []
80
 
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
81
 
 
82
 
    
83
 
def apply_inventory_Inventory(self, basis, delta):
84
 
    """Apply delta to basis and return the result.
85
 
    
86
 
    :param basis: An inventory to be used as the basis.
87
 
    :param delta: The inventory delta to apply:
88
 
    :return: An inventory resulting from the application.
89
 
    """
90
 
    basis.apply_delta(delta)
91
 
    return basis
92
 
 
93
 
 
94
 
def apply_inventory_WT(self, basis, delta):
95
 
    """Apply delta to basis and return the result.
96
 
 
97
 
    This sets the tree state to be basis, and then calls apply_inventory_delta.
98
 
    
99
 
    :param basis: An inventory to be used as the basis.
100
 
    :param delta: The inventory delta to apply:
101
 
    :return: An inventory resulting from the application.
102
 
    """
103
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
104
 
    control.create_repository()
105
 
    control.create_branch()
106
 
    tree = self.format.initialize(control)
107
 
    tree.lock_write()
108
 
    try:
109
 
        tree._write_inventory(basis)
110
 
    finally:
111
 
        tree.unlock()
112
 
    # Fresh object, reads disk again.
113
 
    tree = tree.bzrdir.open_workingtree()
114
 
    tree.lock_write()
115
 
    try:
116
 
        tree.apply_inventory_delta(delta)
117
 
    finally:
118
 
        tree.unlock()
119
 
    # reload tree - ensure we get what was written.
120
 
    tree = tree.bzrdir.open_workingtree()
121
 
    tree.lock_read()
122
 
    self.addCleanup(tree.unlock)
123
 
    # One could add 'tree._validate' here but that would cause 'early' failues 
124
 
    # as far as higher level code is concerned. Possibly adding an
125
 
    # expect_fail parameter to this function and if that is False then do a
126
 
    # validate call.
127
 
    return tree.inventory
128
 
 
129
 
 
130
 
def apply_inventory_WT_basis(self, basis, delta):
131
 
    """Apply delta to basis and return the result.
132
 
 
133
 
    This sets the parent and then calls update_basis_by_delta.
134
 
    It also puts the basis in the repository under both 'basis' and 'result' to
135
 
    allow safety checks made by the WT to succeed, and finally ensures that all
136
 
    items in the delta with a new path are present in the WT before calling
137
 
    update_basis_by_delta.
138
 
    
139
 
    :param basis: An inventory to be used as the basis.
140
 
    :param delta: The inventory delta to apply:
141
 
    :return: An inventory resulting from the application.
142
 
    """
143
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
144
 
    control.create_repository()
145
 
    control.create_branch()
146
 
    tree = self.format.initialize(control)
147
 
    tree.lock_write()
148
 
    try:
149
 
        repo = tree.branch.repository
150
 
        repo.start_write_group()
151
 
        try:
152
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
153
 
                message="", committer="foo@example.com")
154
 
            basis.revision_id = 'basis'
155
 
            create_texts_for_inv(tree.branch.repository, basis)
156
 
            repo.add_revision('basis', rev, basis)
157
 
            # Add a revision for the result, with the basis content - 
158
 
            # update_basis_by_delta doesn't check that the delta results in
159
 
            # result, and we want inconsistent deltas to get called on the
160
 
            # tree, or else the code isn't actually checked.
161
 
            rev = revision.Revision('result', timestamp=0, timezone=None,
162
 
                message="", committer="foo@example.com")
163
 
            basis.revision_id = 'result'
164
 
            repo.add_revision('result', rev, basis)
165
 
            repo.commit_write_group()
166
 
        except:
167
 
            repo.abort_write_group()
168
 
            raise
169
 
        # Set the basis state as the trees current state
170
 
        tree._write_inventory(basis)
171
 
        # This reads basis from the repo and puts it into the tree's local
172
 
        # cache, if it has one.
173
 
        tree.set_parent_ids(['basis'])
174
 
        paths = {}
175
 
        parents = set()
176
 
        for old, new, id, entry in delta:
177
 
            if None in (new, entry):
178
 
                continue
179
 
            paths[new] = (entry.file_id, entry.kind)
180
 
            parents.add(osutils.dirname(new))
181
 
        parents = osutils.minimum_path_selection(parents)
182
 
        parents.discard('')
183
 
        # Put place holders in the tree to permit adding the other entries.
184
 
        for pos, parent in enumerate(parents):
185
 
            if not tree.path2id(parent):
186
 
                # add a synthetic directory in the tree so we can can put the
187
 
                # tree0 entries in place for dirstate.
188
 
                tree.add([parent], ["id%d" % pos], ["directory"])
189
 
        if paths:
190
 
            # Many deltas may cause this mini-apply to fail, but we want to see what
191
 
            # the delta application code says, not the prep that we do to deal with 
192
 
            # limitations of dirstate's update_basis code.
193
 
            for path, (file_id, kind) in sorted(paths.items()):
194
 
                try:
195
 
                    tree.add([path], [file_id], [kind])
196
 
                except (KeyboardInterrupt, SystemExit):
197
 
                    raise
198
 
                except:
199
 
                    pass
200
 
    finally:
201
 
        tree.unlock()
202
 
    # Fresh lock, reads disk again.
203
 
    tree.lock_write()
204
 
    try:
205
 
        tree.update_basis_by_delta('result', delta)
206
 
    finally:
207
 
        tree.unlock()
208
 
    # reload tree - ensure we get what was written.
209
 
    tree = tree.bzrdir.open_workingtree()
210
 
    basis_tree = tree.basis_tree()
211
 
    basis_tree.lock_read()
212
 
    self.addCleanup(basis_tree.unlock)
213
 
    # Note, that if the tree does not have a local cache, the trick above of
214
 
    # setting the result as the basis, will come back to bite us. That said,
215
 
    # all the implementations in bzr do have a local cache.
216
 
    return basis_tree.inventory
217
 
 
218
 
 
219
 
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
220
 
    """Apply delta to basis and return the result.
221
 
    
222
 
    This inserts basis as a whole inventory and then uses
223
 
    add_inventory_by_delta to add delta.
224
 
 
225
 
    :param basis: An inventory to be used as the basis.
226
 
    :param delta: The inventory delta to apply:
227
 
    :return: An inventory resulting from the application.
228
 
    """
229
 
    format = self.format()
230
 
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
231
 
    repo = format.initialize(control)
232
 
    repo.lock_write()
233
 
    try:
234
 
        repo.start_write_group()
235
 
        try:
236
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
237
 
                message="", committer="foo@example.com")
238
 
            basis.revision_id = 'basis'
239
 
            create_texts_for_inv(repo, basis)
240
 
            repo.add_revision('basis', rev, basis)
241
 
            repo.commit_write_group()
242
 
        except:
243
 
            repo.abort_write_group()
244
 
            raise
245
 
    finally:
246
 
        repo.unlock()
247
 
    repo.lock_write()
248
 
    try:
249
 
        repo.start_write_group()
250
 
        try:
251
 
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
252
 
                'result', ['basis'])
253
 
        except:
254
 
            repo.abort_write_group()
255
 
            raise
256
 
        else:
257
 
            repo.commit_write_group()
258
 
    finally:
259
 
        repo.unlock()
260
 
    # Fresh lock, reads disk again.
261
 
    repo = repo.bzrdir.open_repository()
262
 
    repo.lock_read()
263
 
    self.addCleanup(repo.unlock)
264
 
    return repo.get_inventory('result')
265
 
 
266
 
 
267
 
class TestInventoryUpdates(TestCase):
268
 
 
269
 
    def test_creation_from_root_id(self):
270
 
        # iff a root id is passed to the constructor, a root directory is made
271
 
        inv = inventory.Inventory(root_id='tree-root')
272
 
        self.assertNotEqual(None, inv.root)
273
 
        self.assertEqual('tree-root', inv.root.file_id)
274
 
 
275
 
    def test_add_path_of_root(self):
276
 
        # if no root id is given at creation time, there is no root directory
277
 
        inv = inventory.Inventory(root_id=None)
278
 
        self.assertIs(None, inv.root)
279
 
        # add a root entry by adding its path
280
 
        ie = inv.add_path("", "directory", "my-root")
281
 
        ie.revision = 'test-rev'
282
 
        self.assertEqual("my-root", ie.file_id)
283
 
        self.assertIs(ie, inv.root)
284
 
 
285
 
    def test_add_path(self):
286
 
        inv = inventory.Inventory(root_id='tree_root')
287
 
        ie = inv.add_path('hello', 'file', 'hello-id')
288
 
        self.assertEqual('hello-id', ie.file_id)
289
 
        self.assertEqual('file', ie.kind)
290
 
 
291
 
    def test_copy(self):
292
 
        """Make sure copy() works and creates a deep copy."""
293
 
        inv = inventory.Inventory(root_id='some-tree-root')
294
 
        ie = inv.add_path('hello', 'file', 'hello-id')
295
 
        inv2 = inv.copy()
296
 
        inv.root.file_id = 'some-new-root'
297
 
        ie.name = 'file2'
298
 
        self.assertEqual('some-tree-root', inv2.root.file_id)
299
 
        self.assertEqual('hello', inv2['hello-id'].name)
300
 
 
301
 
    def test_copy_empty(self):
302
 
        """Make sure an empty inventory can be copied."""
303
 
        inv = inventory.Inventory(root_id=None)
304
 
        inv2 = inv.copy()
305
 
        self.assertIs(None, inv2.root)
306
 
 
307
 
    def test_copy_copies_root_revision(self):
308
 
        """Make sure the revision of the root gets copied."""
309
 
        inv = inventory.Inventory(root_id='someroot')
310
 
        inv.root.revision = 'therev'
311
 
        inv2 = inv.copy()
312
 
        self.assertEquals('someroot', inv2.root.file_id)
313
 
        self.assertEquals('therev', inv2.root.revision)
314
 
 
315
 
    def test_create_tree_reference(self):
316
 
        inv = inventory.Inventory('tree-root-123')
317
 
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
318
 
                              revision='rev', reference_revision='rev2'))
319
 
 
320
 
    def test_error_encoding(self):
321
 
        inv = inventory.Inventory('tree-root')
322
 
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
323
 
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
324
 
            InventoryFile('b-id', u'\u1234', 'tree-root'))
325
 
        self.assertContainsRe(str(e), r'\\u1234')
326
 
 
327
 
    def test_add_recursive(self):
328
 
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
329
 
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
330
 
        parent.children[child.file_id] = child
331
 
        inv = inventory.Inventory('tree-root')
332
 
        inv.add(parent)
333
 
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
334
 
 
335
 
 
336
 
 
337
 
class TestDeltaApplication(TestCaseWithTransport):
338
 
 
339
 
    scenarios = delta_application_scenarios()
340
 
 
341
 
    def get_empty_inventory(self, reference_inv=None):
342
 
        """Get an empty inventory.
343
 
 
344
 
        Note that tests should not depend on the revision of the root for
345
 
        setting up test conditions, as it has to be flexible to accomodate non
346
 
        rich root repositories.
347
 
 
348
 
        :param reference_inv: If not None, get the revision for the root from
349
 
            this inventory. This is useful for dealing with older repositories
350
 
            that routinely discarded the root entry data. If None, the root's
351
 
            revision is set to 'basis'.
352
 
        """
353
 
        inv = inventory.Inventory()
354
 
        if reference_inv is not None:
355
 
            inv.root.revision = reference_inv.root.revision
356
 
        else:
357
 
            inv.root.revision = 'basis'
358
 
        return inv
359
 
 
360
 
    def test_empty_delta(self):
361
 
        inv = self.get_empty_inventory()
362
 
        delta = []
363
 
        inv = self.apply_delta(self, inv, delta)
364
 
        inv2 = self.get_empty_inventory(inv)
365
 
        self.assertEqual([], inv2._make_delta(inv))
366
 
 
367
 
    def test_None_file_id(self):
368
 
        inv = self.get_empty_inventory()
369
 
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
370
 
        dir1.revision = 'result'
371
 
        delta = [(None, u'dir1', None, dir1)]
372
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
373
 
            inv, delta)
374
 
 
375
 
    def test_unicode_file_id(self):
376
 
        inv = self.get_empty_inventory()
377
 
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
378
 
        dir1.revision = 'result'
379
 
        delta = [(None, u'dir1', dir1.file_id, dir1)]
380
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
381
 
            inv, delta)
382
 
 
383
 
    def test_repeated_file_id(self):
384
 
        inv = self.get_empty_inventory()
385
 
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
386
 
        file1.revision = 'result'
387
 
        file1.text_size = 0
388
 
        file1.text_sha1 = ""
389
 
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
390
 
        file2.revision = 'result'
391
 
        file2.text_size = 0
392
 
        file2.text_sha1 = ""
393
 
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
394
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
395
 
            inv, delta)
396
 
 
397
 
    def test_repeated_new_path(self):
398
 
        inv = self.get_empty_inventory()
399
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
400
 
        file1.revision = 'result'
401
 
        file1.text_size = 0
402
 
        file1.text_sha1 = ""
403
 
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
404
 
        file2.revision = 'result'
405
 
        file2.text_size = 0
406
 
        file2.text_sha1 = ""
407
 
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
408
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
409
 
            inv, delta)
410
 
 
411
 
    def test_repeated_old_path(self):
412
 
        inv = self.get_empty_inventory()
413
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
414
 
        file1.revision = 'result'
415
 
        file1.text_size = 0
416
 
        file1.text_sha1 = ""
417
 
        # We can't *create* a source inventory with the same path, but
418
 
        # a badly generated partial delta might claim the same source twice.
419
 
        # This would be buggy in two ways: the path is repeated in the delta,
420
 
        # And the path for one of the file ids doesn't match the source
421
 
        # location. Alternatively, we could have a repeated fileid, but that
422
 
        # is separately checked for.
423
 
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
424
 
        file2.revision = 'result'
425
 
        file2.text_size = 0
426
 
        file2.text_sha1 = ""
427
 
        inv.add(file1)
428
 
        inv.add(file2)
429
 
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
430
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
431
 
            inv, delta)
432
 
 
433
 
    def test_mismatched_id_entry_id(self):
434
 
        inv = self.get_empty_inventory()
435
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
436
 
        file1.revision = 'result'
437
 
        file1.text_size = 0
438
 
        file1.text_sha1 = ""
439
 
        delta = [(None, u'path', 'id', file1)]
440
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
441
 
            inv, delta)
442
 
 
443
 
    def test_mismatched_new_path_entry_None(self):
444
 
        inv = self.get_empty_inventory()
445
 
        delta = [(None, u'path', 'id', None)]
446
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
447
 
            inv, delta)
448
 
 
449
 
    def test_mismatched_new_path_None_entry(self):
450
 
        inv = self.get_empty_inventory()
451
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
452
 
        file1.revision = 'result'
453
 
        file1.text_size = 0
454
 
        file1.text_sha1 = ""
455
 
        delta = [(u"path", None, 'id1', file1)]
456
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
457
 
            inv, delta)
458
 
 
459
 
    def test_parent_is_not_directory(self):
460
 
        inv = self.get_empty_inventory()
461
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
462
 
        file1.revision = 'result'
463
 
        file1.text_size = 0
464
 
        file1.text_sha1 = ""
465
 
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
466
 
        file2.revision = 'result'
467
 
        file2.text_size = 0
468
 
        file2.text_sha1 = ""
469
 
        inv.add(file1)
470
 
        delta = [(None, u'path/path2', 'id2', file2)]
471
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
472
 
            inv, delta)
473
 
 
474
 
    def test_parent_is_missing(self):
475
 
        inv = self.get_empty_inventory()
476
 
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
477
 
        file2.revision = 'result'
478
 
        file2.text_size = 0
479
 
        file2.text_sha1 = ""
480
 
        delta = [(None, u'path/path2', 'id2', file2)]
481
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
482
 
            inv, delta)
483
 
 
484
 
    def test_new_parent_path_has_wrong_id(self):
485
 
        inv = self.get_empty_inventory()
486
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
487
 
        parent1.revision = 'result'
488
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
489
 
        parent2.revision = 'result'
490
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
491
 
        file1.revision = 'result'
492
 
        file1.text_size = 0
493
 
        file1.text_sha1 = ""
494
 
        inv.add(parent1)
495
 
        inv.add(parent2)
496
 
        # This delta claims that file1 is at dir/path, but actually its at
497
 
        # dir2/path if you follow the inventory parent structure.
498
 
        delta = [(None, u'dir/path', 'id', file1)]
499
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
500
 
            inv, delta)
501
 
 
502
 
    def test_old_parent_path_is_wrong(self):
503
 
        inv = self.get_empty_inventory()
504
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
505
 
        parent1.revision = 'result'
506
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
507
 
        parent2.revision = 'result'
508
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
509
 
        file1.revision = 'result'
510
 
        file1.text_size = 0
511
 
        file1.text_sha1 = ""
512
 
        inv.add(parent1)
513
 
        inv.add(parent2)
514
 
        inv.add(file1)
515
 
        # This delta claims that file1 was at dir/path, but actually it was at
516
 
        # dir2/path if you follow the inventory parent structure.
517
 
        delta = [(u'dir/path', None, 'id', None)]
518
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
519
 
            inv, delta)
520
 
 
521
 
    def test_old_parent_path_is_for_other_id(self):
522
 
        inv = self.get_empty_inventory()
523
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
524
 
        parent1.revision = 'result'
525
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
526
 
        parent2.revision = 'result'
527
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
528
 
        file1.revision = 'result'
529
 
        file1.text_size = 0
530
 
        file1.text_sha1 = ""
531
 
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
532
 
        file2.revision = 'result'
533
 
        file2.text_size = 0
534
 
        file2.text_sha1 = ""
535
 
        inv.add(parent1)
536
 
        inv.add(parent2)
537
 
        inv.add(file1)
538
 
        inv.add(file2)
539
 
        # This delta claims that file1 was at dir/path, but actually it was at
540
 
        # dir2/path if you follow the inventory parent structure. At dir/path
541
 
        # is another entry we should not delete.
542
 
        delta = [(u'dir/path', None, 'id', None)]
543
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
544
 
            inv, delta)
545
 
 
546
 
    def test_add_existing_id_new_path(self):
547
 
        inv = self.get_empty_inventory()
548
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
549
 
        parent1.revision = 'result'
550
 
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
551
 
        parent2.revision = 'result'
552
 
        inv.add(parent1)
553
 
        delta = [(None, u'dir2', 'p-1', parent2)]
554
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
555
 
            inv, delta)
556
 
 
557
 
    def test_add_new_id_existing_path(self):
558
 
        inv = self.get_empty_inventory()
559
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
560
 
        parent1.revision = 'result'
561
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
562
 
        parent2.revision = 'result'
563
 
        inv.add(parent1)
564
 
        delta = [(None, u'dir1', 'p-2', parent2)]
565
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
566
 
            inv, delta)
567
 
 
568
 
    def test_remove_dir_leaving_dangling_child(self):
569
 
        inv = self.get_empty_inventory()
570
 
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
571
 
        dir1.revision = 'result'
572
 
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
573
 
        dir2.revision = 'result'
574
 
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
575
 
        dir3.revision = 'result'
576
 
        inv.add(dir1)
577
 
        inv.add(dir2)
578
 
        inv.add(dir3)
579
 
        delta = [(u'dir1', None, 'p-1', None),
580
 
            (u'dir1/child2', None, 'p-3', None)]
581
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
582
 
            inv, delta)
583
 
 
584
 
 
585
 
class TestInventory(TestCase):
586
 
 
587
 
    def test_is_root(self):
588
 
        """Ensure our root-checking code is accurate."""
589
 
        inv = inventory.Inventory('TREE_ROOT')
590
 
        self.assertTrue(inv.is_root('TREE_ROOT'))
591
 
        self.assertFalse(inv.is_root('booga'))
592
 
        inv.root.file_id = 'booga'
593
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
594
 
        self.assertTrue(inv.is_root('booga'))
595
 
        # works properly even if no root is set
596
 
        inv.root = None
597
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
598
 
        self.assertFalse(inv.is_root('booga'))
599
 
 
600
 
    def test_entries_for_empty_inventory(self):
601
 
        """Test that entries() will not fail for an empty inventory"""
602
 
        inv = Inventory(root_id=None)
603
 
        self.assertEqual([], inv.entries())
604
 
 
605
 
 
606
 
class TestInventoryEntry(TestCase):
607
 
 
608
 
    def test_file_kind_character(self):
609
 
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
610
 
        self.assertEqual(file.kind_character(), '')
611
 
 
612
 
    def test_dir_kind_character(self):
613
 
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
614
 
        self.assertEqual(dir.kind_character(), '/')
615
 
 
616
 
    def test_link_kind_character(self):
617
 
        dir = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
618
 
        self.assertEqual(dir.kind_character(), '')
619
 
 
620
 
    def test_dir_detect_changes(self):
621
 
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
622
 
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
623
 
        self.assertEqual((False, False), left.detect_changes(right))
624
 
        self.assertEqual((False, False), right.detect_changes(left))
625
 
 
626
 
    def test_file_detect_changes(self):
627
 
        left = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
628
 
        left.text_sha1 = 123
629
 
        right = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
630
 
        right.text_sha1 = 123
631
 
        self.assertEqual((False, False), left.detect_changes(right))
632
 
        self.assertEqual((False, False), right.detect_changes(left))
633
 
        left.executable = True
634
 
        self.assertEqual((False, True), left.detect_changes(right))
635
 
        self.assertEqual((False, True), right.detect_changes(left))
636
 
        right.text_sha1 = 321
637
 
        self.assertEqual((True, True), left.detect_changes(right))
638
 
        self.assertEqual((True, True), right.detect_changes(left))
639
 
 
640
 
    def test_symlink_detect_changes(self):
641
 
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
642
 
        left.symlink_target='foo'
643
 
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
644
 
        right.symlink_target='foo'
645
 
        self.assertEqual((False, False), left.detect_changes(right))
646
 
        self.assertEqual((False, False), right.detect_changes(left))
647
 
        left.symlink_target = 'different'
648
 
        self.assertEqual((True, False), left.detect_changes(right))
649
 
        self.assertEqual((True, False), right.detect_changes(left))
650
 
 
651
 
    def test_file_has_text(self):
652
 
        file = inventory.InventoryFile('123', 'hello.c', ROOT_ID)
653
 
        self.failUnless(file.has_text())
654
 
 
655
 
    def test_directory_has_text(self):
656
 
        dir = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
657
 
        self.failIf(dir.has_text())
658
 
 
659
 
    def test_link_has_text(self):
660
 
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
661
 
        self.failIf(link.has_text())
662
 
 
663
 
    def test_make_entry(self):
664
 
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
665
 
            inventory.InventoryFile)
666
 
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
667
 
            inventory.InventoryLink)
668
 
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
669
 
            inventory.InventoryDirectory)
670
 
 
671
 
    def test_make_entry_non_normalized(self):
672
 
        orig_normalized_filename = osutils.normalized_filename
673
 
 
674
 
        try:
675
 
            osutils.normalized_filename = osutils._accessible_normalized_filename
676
 
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
677
 
            self.assertEqual(u'\xe5', entry.name)
678
 
            self.assertIsInstance(entry, inventory.InventoryFile)
679
 
 
680
 
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
681
 
            self.assertRaises(errors.InvalidNormalization,
682
 
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
683
 
        finally:
684
 
            osutils.normalized_filename = orig_normalized_filename
685
 
 
686
 
 
687
 
class TestDescribeChanges(TestCase):
688
 
 
689
 
    def test_describe_change(self):
690
 
        # we need to test the following change combinations:
691
 
        # rename
692
 
        # reparent
693
 
        # modify
694
 
        # gone
695
 
        # added
696
 
        # renamed/reparented and modified
697
 
        # change kind (perhaps can't be done yet?)
698
 
        # also, merged in combination with all of these?
699
 
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
700
 
        old_a.text_sha1 = '123132'
701
 
        old_a.text_size = 0
702
 
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
703
 
        new_a.text_sha1 = '123132'
704
 
        new_a.text_size = 0
705
 
 
706
 
        self.assertChangeDescription('unchanged', old_a, new_a)
707
 
 
708
 
        new_a.text_size = 10
709
 
        new_a.text_sha1 = 'abcabc'
710
 
        self.assertChangeDescription('modified', old_a, new_a)
711
 
 
712
 
        self.assertChangeDescription('added', None, new_a)
713
 
        self.assertChangeDescription('removed', old_a, None)
714
 
        # perhaps a bit questionable but seems like the most reasonable thing...
715
 
        self.assertChangeDescription('unchanged', None, None)
716
 
 
717
 
        # in this case it's both renamed and modified; show a rename and
718
 
        # modification:
719
 
        new_a.name = 'newfilename'
720
 
        self.assertChangeDescription('modified and renamed', old_a, new_a)
721
 
 
722
 
        # reparenting is 'renaming'
723
 
        new_a.name = old_a.name
724
 
        new_a.parent_id = 'somedir-id'
725
 
        self.assertChangeDescription('modified and renamed', old_a, new_a)
726
 
 
727
 
        # reset the content values so its not modified
728
 
        new_a.text_size = old_a.text_size
729
 
        new_a.text_sha1 = old_a.text_sha1
730
 
        new_a.name = old_a.name
731
 
 
732
 
        new_a.name = 'newfilename'
733
 
        self.assertChangeDescription('renamed', old_a, new_a)
734
 
 
735
 
        # reparenting is 'renaming'
736
 
        new_a.name = old_a.name
737
 
        new_a.parent_id = 'somedir-id'
738
 
        self.assertChangeDescription('renamed', old_a, new_a)
739
 
 
740
 
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
741
 
        change = InventoryEntry.describe_change(old_ie, new_ie)
742
 
        self.assertEqual(expected_change, change)
743
 
 
744
 
 
745
 
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
746
 
 
747
 
    def get_chk_bytes(self):
748
 
        factory = groupcompress.make_pack_factory(True, True, 1)
749
 
        trans = self.get_transport('')
750
 
        return factory(trans)
751
 
 
752
 
    def read_bytes(self, chk_bytes, key):
753
 
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
754
 
        return stream.next().get_bytes_as("fulltext")
755
 
 
756
 
    def test_deserialise_gives_CHKInventory(self):
757
 
        inv = Inventory()
758
 
        inv.revision_id = "revid"
759
 
        inv.root.revision = "rootrev"
760
 
        chk_bytes = self.get_chk_bytes()
761
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
762
 
        bytes = ''.join(chk_inv.to_lines())
763
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
764
 
        self.assertEqual("revid", new_inv.revision_id)
765
 
        self.assertEqual("directory", new_inv.root.kind)
766
 
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
767
 
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
768
 
        self.assertEqual(inv.root.name, new_inv.root.name)
769
 
        self.assertEqual("rootrev", new_inv.root.revision)
770
 
        self.assertEqual('plain', new_inv._search_key_name)
771
 
 
772
 
    def test_deserialise_wrong_revid(self):
773
 
        inv = Inventory()
774
 
        inv.revision_id = "revid"
775
 
        inv.root.revision = "rootrev"
776
 
        chk_bytes = self.get_chk_bytes()
777
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
778
 
        bytes = ''.join(chk_inv.to_lines())
779
 
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
780
 
            bytes, ("revid2",))
781
 
 
782
 
    def test_captures_rev_root_byid(self):
783
 
        inv = Inventory()
784
 
        inv.revision_id = "foo"
785
 
        inv.root.revision = "bar"
786
 
        chk_bytes = self.get_chk_bytes()
787
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
788
 
        lines = chk_inv.to_lines()
789
 
        self.assertEqual([
790
 
            'chkinventory:\n',
791
 
            'revision_id: foo\n',
792
 
            'root_id: TREE_ROOT\n',
793
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
794
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
795
 
            ], lines)
796
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
797
 
        self.assertEqual('plain', chk_inv._search_key_name)
798
 
 
799
 
    def test_captures_parent_id_basename_index(self):
800
 
        inv = Inventory()
801
 
        inv.revision_id = "foo"
802
 
        inv.root.revision = "bar"
803
 
        chk_bytes = self.get_chk_bytes()
804
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
805
 
        lines = chk_inv.to_lines()
806
 
        self.assertEqual([
807
 
            'chkinventory:\n',
808
 
            'revision_id: foo\n',
809
 
            'root_id: TREE_ROOT\n',
810
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
811
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
812
 
            ], lines)
813
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
814
 
        self.assertEqual('plain', chk_inv._search_key_name)
815
 
 
816
 
    def test_captures_search_key_name(self):
817
 
        inv = Inventory()
818
 
        inv.revision_id = "foo"
819
 
        inv.root.revision = "bar"
820
 
        chk_bytes = self.get_chk_bytes()
821
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
822
 
                                              search_key_name='hash-16-way')
823
 
        lines = chk_inv.to_lines()
824
 
        self.assertEqual([
825
 
            'chkinventory:\n',
826
 
            'search_key_name: hash-16-way\n',
827
 
            'root_id: TREE_ROOT\n',
828
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
829
 
            'revision_id: foo\n',
830
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
831
 
            ], lines)
832
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
833
 
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
834
 
 
835
 
    def test_directory_children_on_demand(self):
836
 
        inv = Inventory()
837
 
        inv.revision_id = "revid"
838
 
        inv.root.revision = "rootrev"
839
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
840
 
        inv["fileid"].revision = "filerev"
841
 
        inv["fileid"].executable = True
842
 
        inv["fileid"].text_sha1 = "ffff"
843
 
        inv["fileid"].text_size = 1
844
 
        chk_bytes = self.get_chk_bytes()
845
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
846
 
        bytes = ''.join(chk_inv.to_lines())
847
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
848
 
        root_entry = new_inv[inv.root.file_id]
849
 
        self.assertEqual(None, root_entry._children)
850
 
        self.assertEqual(['file'], root_entry.children.keys())
851
 
        file_direct = new_inv["fileid"]
852
 
        file_found = root_entry.children['file']
853
 
        self.assertEqual(file_direct.kind, file_found.kind)
854
 
        self.assertEqual(file_direct.file_id, file_found.file_id)
855
 
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
856
 
        self.assertEqual(file_direct.name, file_found.name)
857
 
        self.assertEqual(file_direct.revision, file_found.revision)
858
 
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
859
 
        self.assertEqual(file_direct.text_size, file_found.text_size)
860
 
        self.assertEqual(file_direct.executable, file_found.executable)
861
 
 
862
 
    def test_from_inventory_maximum_size(self):
863
 
        # from_inventory supports the maximum_size parameter.
864
 
        inv = Inventory()
865
 
        inv.revision_id = "revid"
866
 
        inv.root.revision = "rootrev"
867
 
        chk_bytes = self.get_chk_bytes()
868
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
869
 
        chk_inv.id_to_entry._ensure_root()
870
 
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
871
 
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
872
 
        p_id_basename = chk_inv.parent_id_basename_to_file_id
873
 
        p_id_basename._ensure_root()
874
 
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
875
 
        self.assertEqual(2, p_id_basename._root_node._key_width)
876
 
 
877
 
    def test___iter__(self):
878
 
        inv = Inventory()
879
 
        inv.revision_id = "revid"
880
 
        inv.root.revision = "rootrev"
881
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
882
 
        inv["fileid"].revision = "filerev"
883
 
        inv["fileid"].executable = True
884
 
        inv["fileid"].text_sha1 = "ffff"
885
 
        inv["fileid"].text_size = 1
886
 
        chk_bytes = self.get_chk_bytes()
887
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
888
 
        bytes = ''.join(chk_inv.to_lines())
889
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
890
 
        fileids = list(new_inv.__iter__())
891
 
        fileids.sort()
892
 
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
893
 
 
894
 
    def test__len__(self):
895
 
        inv = Inventory()
896
 
        inv.revision_id = "revid"
897
 
        inv.root.revision = "rootrev"
898
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
899
 
        inv["fileid"].revision = "filerev"
900
 
        inv["fileid"].executable = True
901
 
        inv["fileid"].text_sha1 = "ffff"
902
 
        inv["fileid"].text_size = 1
903
 
        chk_bytes = self.get_chk_bytes()
904
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
905
 
        self.assertEqual(2, len(chk_inv))
906
 
 
907
 
    def test___getitem__(self):
908
 
        inv = Inventory()
909
 
        inv.revision_id = "revid"
910
 
        inv.root.revision = "rootrev"
911
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
912
 
        inv["fileid"].revision = "filerev"
913
 
        inv["fileid"].executable = True
914
 
        inv["fileid"].text_sha1 = "ffff"
915
 
        inv["fileid"].text_size = 1
916
 
        chk_bytes = self.get_chk_bytes()
917
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
918
 
        bytes = ''.join(chk_inv.to_lines())
919
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
920
 
        root_entry = new_inv[inv.root.file_id]
921
 
        file_entry = new_inv["fileid"]
922
 
        self.assertEqual("directory", root_entry.kind)
923
 
        self.assertEqual(inv.root.file_id, root_entry.file_id)
924
 
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
925
 
        self.assertEqual(inv.root.name, root_entry.name)
926
 
        self.assertEqual("rootrev", root_entry.revision)
927
 
        self.assertEqual("file", file_entry.kind)
928
 
        self.assertEqual("fileid", file_entry.file_id)
929
 
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
930
 
        self.assertEqual("file", file_entry.name)
931
 
        self.assertEqual("filerev", file_entry.revision)
932
 
        self.assertEqual("ffff", file_entry.text_sha1)
933
 
        self.assertEqual(1, file_entry.text_size)
934
 
        self.assertEqual(True, file_entry.executable)
935
 
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
936
 
 
937
 
    def test_has_id_true(self):
938
 
        inv = Inventory()
939
 
        inv.revision_id = "revid"
940
 
        inv.root.revision = "rootrev"
941
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
942
 
        inv["fileid"].revision = "filerev"
943
 
        inv["fileid"].executable = True
944
 
        inv["fileid"].text_sha1 = "ffff"
945
 
        inv["fileid"].text_size = 1
946
 
        chk_bytes = self.get_chk_bytes()
947
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
948
 
        self.assertTrue(chk_inv.has_id('fileid'))
949
 
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
950
 
 
951
 
    def test_has_id_not(self):
952
 
        inv = Inventory()
953
 
        inv.revision_id = "revid"
954
 
        inv.root.revision = "rootrev"
955
 
        chk_bytes = self.get_chk_bytes()
956
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
957
 
        self.assertFalse(chk_inv.has_id('fileid'))
958
 
 
959
 
    def test_id2path(self):
960
 
        inv = Inventory()
961
 
        inv.revision_id = "revid"
962
 
        inv.root.revision = "rootrev"
963
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
964
 
        fileentry = InventoryFile("fileid", "file", "dirid")
965
 
        inv.add(direntry)
966
 
        inv.add(fileentry)
967
 
        inv["fileid"].revision = "filerev"
968
 
        inv["fileid"].executable = True
969
 
        inv["fileid"].text_sha1 = "ffff"
970
 
        inv["fileid"].text_size = 1
971
 
        inv["dirid"].revision = "filerev"
972
 
        chk_bytes = self.get_chk_bytes()
973
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
974
 
        bytes = ''.join(chk_inv.to_lines())
975
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
976
 
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
977
 
        self.assertEqual('dir', new_inv.id2path('dirid'))
978
 
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
979
 
 
980
 
    def test_path2id(self):
981
 
        inv = Inventory()
982
 
        inv.revision_id = "revid"
983
 
        inv.root.revision = "rootrev"
984
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
985
 
        fileentry = InventoryFile("fileid", "file", "dirid")
986
 
        inv.add(direntry)
987
 
        inv.add(fileentry)
988
 
        inv["fileid"].revision = "filerev"
989
 
        inv["fileid"].executable = True
990
 
        inv["fileid"].text_sha1 = "ffff"
991
 
        inv["fileid"].text_size = 1
992
 
        inv["dirid"].revision = "filerev"
993
 
        chk_bytes = self.get_chk_bytes()
994
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
995
 
        bytes = ''.join(chk_inv.to_lines())
996
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
997
 
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
998
 
        self.assertEqual('dirid', new_inv.path2id('dir'))
999
 
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
1000
 
 
1001
 
    def test_create_by_apply_delta_sets_root(self):
1002
 
        inv = Inventory()
1003
 
        inv.revision_id = "revid"
1004
 
        chk_bytes = self.get_chk_bytes()
1005
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1006
 
        inv.add_path("", "directory", "myrootid", None)
1007
 
        inv.revision_id = "expectedid"
1008
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1009
 
        delta = [("", None, base_inv.root.file_id, None),
1010
 
            (None, "",  "myrootid", inv.root)]
1011
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1012
 
        self.assertEquals(reference_inv.root, new_inv.root)
1013
 
 
1014
 
    def test_create_by_apply_delta_empty_add_child(self):
1015
 
        inv = Inventory()
1016
 
        inv.revision_id = "revid"
1017
 
        inv.root.revision = "rootrev"
1018
 
        chk_bytes = self.get_chk_bytes()
1019
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1020
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1021
 
        a_entry.revision = "filerev"
1022
 
        a_entry.executable = True
1023
 
        a_entry.text_sha1 = "ffff"
1024
 
        a_entry.text_size = 1
1025
 
        inv.add(a_entry)
1026
 
        inv.revision_id = "expectedid"
1027
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1028
 
        delta = [(None, "A",  "A-id", a_entry)]
1029
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1030
 
        # new_inv should be the same as reference_inv.
1031
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1032
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1033
 
        reference_inv.id_to_entry._ensure_root()
1034
 
        new_inv.id_to_entry._ensure_root()
1035
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1036
 
            new_inv.id_to_entry._root_node._key)
1037
 
 
1038
 
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1039
 
        inv = Inventory()
1040
 
        inv.revision_id = "revid"
1041
 
        inv.root.revision = "rootrev"
1042
 
        chk_bytes = self.get_chk_bytes()
1043
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1044
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1045
 
        a_entry.revision = "filerev"
1046
 
        a_entry.executable = True
1047
 
        a_entry.text_sha1 = "ffff"
1048
 
        a_entry.text_size = 1
1049
 
        inv.add(a_entry)
1050
 
        inv.revision_id = "expectedid"
1051
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1052
 
        delta = [(None, "A",  "A-id", a_entry)]
1053
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1054
 
        reference_inv.id_to_entry._ensure_root()
1055
 
        reference_inv.parent_id_basename_to_file_id._ensure_root()
1056
 
        new_inv.id_to_entry._ensure_root()
1057
 
        new_inv.parent_id_basename_to_file_id._ensure_root()
1058
 
        # new_inv should be the same as reference_inv.
1059
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1060
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1061
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1062
 
            new_inv.id_to_entry._root_node._key)
1063
 
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1064
 
            new_inv.parent_id_basename_to_file_id._root_node._key)
1065
 
 
1066
 
    def test_iter_changes(self):
1067
 
        # Low level bootstrapping smoke test; comprehensive generic tests via
1068
 
        # InterTree are coming.
1069
 
        inv = Inventory()
1070
 
        inv.revision_id = "revid"
1071
 
        inv.root.revision = "rootrev"
1072
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1073
 
        inv["fileid"].revision = "filerev"
1074
 
        inv["fileid"].executable = True
1075
 
        inv["fileid"].text_sha1 = "ffff"
1076
 
        inv["fileid"].text_size = 1
1077
 
        inv2 = Inventory()
1078
 
        inv2.revision_id = "revid2"
1079
 
        inv2.root.revision = "rootrev"
1080
 
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1081
 
        inv2["fileid"].revision = "filerev2"
1082
 
        inv2["fileid"].executable = False
1083
 
        inv2["fileid"].text_sha1 = "bbbb"
1084
 
        inv2["fileid"].text_size = 2
1085
 
        # get fresh objects.
1086
 
        chk_bytes = self.get_chk_bytes()
1087
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1088
 
        bytes = ''.join(chk_inv.to_lines())
1089
 
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1090
 
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1091
 
        bytes = ''.join(chk_inv2.to_lines())
1092
 
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1093
 
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1094
 
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1095
 
            (False, True))],
1096
 
            list(inv_1.iter_changes(inv_2)))
1097
 
 
1098
 
    def test_parent_id_basename_to_file_id_index_enabled(self):
1099
 
        inv = Inventory()
1100
 
        inv.revision_id = "revid"
1101
 
        inv.root.revision = "rootrev"
1102
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1103
 
        inv["fileid"].revision = "filerev"
1104
 
        inv["fileid"].executable = True
1105
 
        inv["fileid"].text_sha1 = "ffff"
1106
 
        inv["fileid"].text_size = 1
1107
 
        # get fresh objects.
1108
 
        chk_bytes = self.get_chk_bytes()
1109
 
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1110
 
        bytes = ''.join(tmp_inv.to_lines())
1111
 
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1112
 
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1113
 
        self.assertEqual(
1114
 
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1115
 
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1116
 
 
1117
 
    def test_file_entry_to_bytes(self):
1118
 
        inv = CHKInventory(None)
1119
 
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1120
 
        ie.executable = True
1121
 
        ie.revision = 'file-rev-id'
1122
 
        ie.text_sha1 = 'abcdefgh'
1123
 
        ie.text_size = 100
1124
 
        bytes = inv._entry_to_bytes(ie)
1125
 
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
1126
 
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
1127
 
        ie2 = inv._bytes_to_entry(bytes)
1128
 
        self.assertEqual(ie, ie2)
1129
 
        self.assertIsInstance(ie2.name, unicode)
1130
 
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1131
 
                         inv._bytes_to_utf8name_key(bytes))
1132
 
 
1133
 
    def test_file2_entry_to_bytes(self):
1134
 
        inv = CHKInventory(None)
1135
 
        # \u30a9 == 'omega'
1136
 
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1137
 
        ie.executable = False
1138
 
        ie.revision = 'file-rev-id'
1139
 
        ie.text_sha1 = '123456'
1140
 
        ie.text_size = 25
1141
 
        bytes = inv._entry_to_bytes(ie)
1142
 
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1143
 
                         'file-rev-id\n123456\n25\nN', bytes)
1144
 
        ie2 = inv._bytes_to_entry(bytes)
1145
 
        self.assertEqual(ie, ie2)
1146
 
        self.assertIsInstance(ie2.name, unicode)
1147
 
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1148
 
                         inv._bytes_to_utf8name_key(bytes))
1149
 
 
1150
 
    def test_dir_entry_to_bytes(self):
1151
 
        inv = CHKInventory(None)
1152
 
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1153
 
        ie.revision = 'dir-rev-id'
1154
 
        bytes = inv._entry_to_bytes(ie)
1155
 
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1156
 
        ie2 = inv._bytes_to_entry(bytes)
1157
 
        self.assertEqual(ie, ie2)
1158
 
        self.assertIsInstance(ie2.name, unicode)
1159
 
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1160
 
                         inv._bytes_to_utf8name_key(bytes))
1161
 
 
1162
 
    def test_dir2_entry_to_bytes(self):
1163
 
        inv = CHKInventory(None)
1164
 
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1165
 
                                          None)
1166
 
        ie.revision = 'dir-rev-id'
1167
 
        bytes = inv._entry_to_bytes(ie)
1168
 
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1169
 
                         'dir-rev-id', bytes)
1170
 
        ie2 = inv._bytes_to_entry(bytes)
1171
 
        self.assertEqual(ie, ie2)
1172
 
        self.assertIsInstance(ie2.name, unicode)
1173
 
        self.assertIs(ie2.parent_id, None)
1174
 
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1175
 
                         inv._bytes_to_utf8name_key(bytes))
1176
 
 
1177
 
    def test_symlink_entry_to_bytes(self):
1178
 
        inv = CHKInventory(None)
1179
 
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1180
 
        ie.revision = 'link-rev-id'
1181
 
        ie.symlink_target = u'target/path'
1182
 
        bytes = inv._entry_to_bytes(ie)
1183
 
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1184
 
                         'link-rev-id\ntarget/path', bytes)
1185
 
        ie2 = inv._bytes_to_entry(bytes)
1186
 
        self.assertEqual(ie, ie2)
1187
 
        self.assertIsInstance(ie2.name, unicode)
1188
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1189
 
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1190
 
                         inv._bytes_to_utf8name_key(bytes))
1191
 
 
1192
 
    def test_symlink2_entry_to_bytes(self):
1193
 
        inv = CHKInventory(None)
1194
 
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1195
 
        ie.revision = 'link-rev-id'
1196
 
        ie.symlink_target = u'target/\u03a9path'
1197
 
        bytes = inv._entry_to_bytes(ie)
1198
 
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1199
 
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
1200
 
        ie2 = inv._bytes_to_entry(bytes)
1201
 
        self.assertEqual(ie, ie2)
1202
 
        self.assertIsInstance(ie2.name, unicode)
1203
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1204
 
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1205
 
                         inv._bytes_to_utf8name_key(bytes))
1206
 
 
1207
 
    def test_tree_reference_entry_to_bytes(self):
1208
 
        inv = CHKInventory(None)
1209
 
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1210
 
                                     'parent-id')
1211
 
        ie.revision = 'tree-rev-id'
1212
 
        ie.reference_revision = 'ref-rev-id'
1213
 
        bytes = inv._entry_to_bytes(ie)
1214
 
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1215
 
                         'tree-rev-id\nref-rev-id', bytes)
1216
 
        ie2 = inv._bytes_to_entry(bytes)
1217
 
        self.assertEqual(ie, ie2)
1218
 
        self.assertIsInstance(ie2.name, unicode)
1219
 
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1220
 
                         inv._bytes_to_utf8name_key(bytes))
1221
 
 
1222
 
 
1223
 
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1224
 
 
1225
 
    def get_chk_bytes(self):
1226
 
        factory = groupcompress.make_pack_factory(True, True, 1)
1227
 
        trans = self.get_transport('')
1228
 
        return factory(trans)
1229
 
 
1230
 
    def make_dir(self, inv, name, parent_id):
1231
 
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1232
 
 
1233
 
    def make_file(self, inv, name, parent_id, content='content\n'):
1234
 
        ie = inv.make_entry('file', name, parent_id, name + '-id')
1235
 
        ie.text_sha1 = osutils.sha_string(content)
1236
 
        ie.text_size = len(content)
1237
 
        inv.add(ie)
1238
 
 
1239
 
    def make_simple_inventory(self):
1240
 
        inv = Inventory('TREE_ROOT')
1241
 
        inv.revision_id = "revid"
1242
 
        inv.root.revision = "rootrev"
1243
 
        # /                 TREE_ROOT
1244
 
        # dir1/             dir1-id
1245
 
        #   sub-file1       sub-file1-id
1246
 
        #   sub-file2       sub-file2-id
1247
 
        #   sub-dir1/       sub-dir1-id
1248
 
        #     subsub-file1  subsub-file1-id
1249
 
        # dir2/             dir2-id
1250
 
        #   sub2-file1      sub2-file1-id
1251
 
        # top               top-id
1252
 
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
1253
 
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
1254
 
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
1255
 
        self.make_file(inv, 'top', 'TREE_ROOT')
1256
 
        self.make_file(inv, 'sub-file1', 'dir1-id')
1257
 
        self.make_file(inv, 'sub-file2', 'dir1-id')
1258
 
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1259
 
        self.make_file(inv, 'sub2-file1', 'dir2-id')
1260
 
        chk_bytes = self.get_chk_bytes()
1261
 
        #  use a small maximum_size to force internal paging structures
1262
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1263
 
                        maximum_size=100,
1264
 
                        search_key_name='hash-255-way')
1265
 
        bytes = ''.join(chk_inv.to_lines())
1266
 
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1267
 
 
1268
 
    def assert_Getitems(self, expected_fileids, inv, file_ids):
1269
 
        self.assertEqual(sorted(expected_fileids),
1270
 
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1271
 
 
1272
 
    def assertExpand(self, all_ids, inv, file_ids):
1273
 
        (val_all_ids,
1274
 
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1275
 
        self.assertEqual(set(all_ids), val_all_ids)
1276
 
        entries = inv._getitems(val_all_ids)
1277
 
        expected_children = {}
1278
 
        for entry in entries:
1279
 
            s = expected_children.setdefault(entry.parent_id, [])
1280
 
            s.append(entry.file_id)
1281
 
        val_children = dict((k, sorted(v)) for k, v
1282
 
                            in val_children.iteritems())
1283
 
        expected_children = dict((k, sorted(v)) for k, v
1284
 
                            in expected_children.iteritems())
1285
 
        self.assertEqual(expected_children, val_children)
1286
 
 
1287
 
    def test_make_simple_inventory(self):
1288
 
        inv = self.make_simple_inventory()
1289
 
        layout = []
1290
 
        for path, entry in inv.iter_entries_by_dir():
1291
 
            layout.append((path, entry.file_id))
1292
 
        self.assertEqual([
1293
 
            ('', 'TREE_ROOT'),
1294
 
            ('dir1', 'dir1-id'),
1295
 
            ('dir2', 'dir2-id'),
1296
 
            ('top', 'top-id'),
1297
 
            ('dir1/sub-dir1', 'sub-dir1-id'),
1298
 
            ('dir1/sub-file1', 'sub-file1-id'),
1299
 
            ('dir1/sub-file2', 'sub-file2-id'),
1300
 
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1301
 
            ('dir2/sub2-file1', 'sub2-file1-id'),
1302
 
            ], layout)
1303
 
 
1304
 
    def test__getitems(self):
1305
 
        inv = self.make_simple_inventory()
1306
 
        # Reading from disk
1307
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1308
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1309
 
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1310
 
        # From cache
1311
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1312
 
        # Mixed
1313
 
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1314
 
                             ['dir1-id', 'sub-file2-id'])
1315
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1316
 
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1317
 
 
1318
 
    def test_single_file(self):
1319
 
        inv = self.make_simple_inventory()
1320
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1321
 
 
1322
 
    def test_get_all_parents(self):
1323
 
        inv = self.make_simple_inventory()
1324
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1325
 
                           'subsub-file1-id',
1326
 
                          ], inv, ['subsub-file1-id'])
1327
 
 
1328
 
    def test_get_children(self):
1329
 
        inv = self.make_simple_inventory()
1330
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1331
 
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1332
 
                          ], inv, ['dir1-id'])
1333
 
 
1334
 
    def test_from_root(self):
1335
 
        inv = self.make_simple_inventory()
1336
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1337
 
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1338
 
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1339
 
 
1340
 
    def test_top_level_file(self):
1341
 
        inv = self.make_simple_inventory()
1342
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1343
 
 
1344
 
    def test_subsub_file(self):
1345
 
        inv = self.make_simple_inventory()
1346
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1347
 
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
1348
 
 
1349
 
    def test_sub_and_root(self):
1350
 
        inv = self.make_simple_inventory()
1351
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1352
 
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from bzrlib.selftest import TestBase
 
18
 
 
19
from bzrlib.inventory import Inventory, InventoryEntry
 
20
 
 
21
 
 
22
class TestIsWithin(TestBase):
 
23
    def runTest(self):
 
24
        from bzrlib.osutils import is_inside_any
 
25
        
 
26
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
27
                         (['src'], 'src/foo.c'),
 
28
                         (['src'], 'src'),
 
29
                         ]:
 
30
            self.assert_(is_inside_any(dirs, fn))
 
31
            
 
32
        for dirs, fn in [(['src'], 'srccontrol'),
 
33
                         (['src'], 'srccontrol/foo')]:
 
34
            self.assertFalse(is_inside_any(dirs, fn))
 
35
            
 
36
            
 
37
            
 
38
class TestInventoryIds(TestBase):
 
39
    def runTest(self):
 
40
        """Test detection of files within selected directories."""
 
41
        inv = Inventory()
 
42
        
 
43
        for args in [('src', 'directory', 'src-id'), 
 
44
                     ('doc', 'directory', 'doc-id'), 
 
45
                     ('src/hello.c', 'file'),
 
46
                     ('src/bye.c', 'file', 'bye-id'),
 
47
                     ('Makefile', 'file')]:
 
48
            inv.add_path(*args)
 
49
            
 
50
        self.assertEqual(inv.path2id('src'), 'src-id')
 
51
        self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
 
52
        
 
53
        self.assert_('src-id' in inv)