~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-07-12 12:36:57 UTC
  • mfrom: (1732.3.4 bzr.revnoX)
  • Revision ID: pqm@pqm.ubuntu.com-20060712123657-365eeb32b69308bf
(matthieu) revno:x:url revision spec

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
2
 
#
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
 
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
#
 
7
 
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
#
 
12
 
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from bzrlib import (
19
 
    chk_map,
20
 
    groupcompress,
21
 
    errors,
22
 
    inventory,
23
 
    osutils,
24
 
    repository,
25
 
    revision,
26
 
    tests,
27
 
    )
28
 
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
29
 
    InventoryDirectory, InventoryEntry, TreeReference)
30
 
from bzrlib.tests import (
31
 
    TestCase,
32
 
    TestCaseWithTransport,
33
 
    condition_isinstance,
34
 
    multiply_tests,
35
 
    split_suite_by_condition,
36
 
    )
37
 
from bzrlib.tests.per_workingtree import workingtree_formats
38
 
 
39
 
 
40
 
def load_tests(standard_tests, module, loader):
41
 
    """Parameterise some inventory tests."""
42
 
    to_adapt, result = split_suite_by_condition(standard_tests,
43
 
        condition_isinstance(TestDeltaApplication))
44
 
    scenarios = [
45
 
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
46
 
        ]
47
 
    # Working tree basis delta application
48
 
    # Repository add_inv_by_delta.
49
 
    # Reduce form of the per_repository test logic - that logic needs to be
50
 
    # be able to get /just/ repositories whereas these tests are fine with
51
 
    # just creating trees.
52
 
    formats = set()
53
 
    for _, format in repository.format_registry.iteritems():
54
 
        scenarios.append((str(format.__name__), {
55
 
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
56
 
            'format':format}))
57
 
    for format in workingtree_formats():
58
 
        scenarios.append(
59
 
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
60
 
            'apply_delta':apply_inventory_WT_basis,
61
 
            'format':format}))
62
 
        scenarios.append(
63
 
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
64
 
            'apply_delta':apply_inventory_WT,
65
 
            'format':format}))
66
 
    return multiply_tests(to_adapt, scenarios, result)
67
 
 
68
 
 
69
 
def create_texts_for_inv(repo, inv):
70
 
    for path, ie in inv.iter_entries():
71
 
        if ie.text_size:
72
 
            lines = ['a' * ie.text_size]
73
 
        else:
74
 
            lines = []
75
 
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
76
 
    
77
 
def apply_inventory_Inventory(self, basis, delta):
78
 
    """Apply delta to basis and return the result.
79
 
    
80
 
    :param basis: An inventory to be used as the basis.
81
 
    :param delta: The inventory delta to apply:
82
 
    :return: An inventory resulting from the application.
83
 
    """
84
 
    basis.apply_delta(delta)
85
 
    return basis
86
 
 
87
 
 
88
 
def apply_inventory_WT(self, basis, delta):
89
 
    """Apply delta to basis and return the result.
90
 
 
91
 
    This sets the tree state to be basis, and then calls apply_inventory_delta.
92
 
    
93
 
    :param basis: An inventory to be used as the basis.
94
 
    :param delta: The inventory delta to apply:
95
 
    :return: An inventory resulting from the application.
96
 
    """
97
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
98
 
    control.create_repository()
99
 
    control.create_branch()
100
 
    tree = self.format.initialize(control)
101
 
    tree.lock_write()
102
 
    try:
103
 
        tree._write_inventory(basis)
104
 
    finally:
105
 
        tree.unlock()
106
 
    # Fresh object, reads disk again.
107
 
    tree = tree.bzrdir.open_workingtree()
108
 
    tree.lock_write()
109
 
    try:
110
 
        tree.apply_inventory_delta(delta)
111
 
    finally:
112
 
        tree.unlock()
113
 
    # reload tree - ensure we get what was written.
114
 
    tree = tree.bzrdir.open_workingtree()
115
 
    tree.lock_read()
116
 
    self.addCleanup(tree.unlock)
117
 
    # One could add 'tree._validate' here but that would cause 'early' failues 
118
 
    # as far as higher level code is concerned. Possibly adding an
119
 
    # expect_fail parameter to this function and if that is False then do a
120
 
    # validate call.
121
 
    return tree.inventory
122
 
 
123
 
 
124
 
def apply_inventory_WT_basis(self, basis, delta):
125
 
    """Apply delta to basis and return the result.
126
 
 
127
 
    This sets the parent and then calls update_basis_by_delta.
128
 
    It also puts the basis in the repository under both 'basis' and 'result' to
129
 
    allow safety checks made by the WT to succeed, and finally ensures that all
130
 
    items in the delta with a new path are present in the WT before calling
131
 
    update_basis_by_delta.
132
 
    
133
 
    :param basis: An inventory to be used as the basis.
134
 
    :param delta: The inventory delta to apply:
135
 
    :return: An inventory resulting from the application.
136
 
    """
137
 
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
138
 
    control.create_repository()
139
 
    control.create_branch()
140
 
    tree = self.format.initialize(control)
141
 
    tree.lock_write()
142
 
    try:
143
 
        repo = tree.branch.repository
144
 
        repo.start_write_group()
145
 
        try:
146
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
147
 
                message="", committer="foo@example.com")
148
 
            basis.revision_id = 'basis'
149
 
            create_texts_for_inv(tree.branch.repository, basis)
150
 
            repo.add_revision('basis', rev, basis)
151
 
            # Add a revision for the result, with the basis content - 
152
 
            # update_basis_by_delta doesn't check that the delta results in
153
 
            # result, and we want inconsistent deltas to get called on the
154
 
            # tree, or else the code isn't actually checked.
155
 
            rev = revision.Revision('result', timestamp=0, timezone=None,
156
 
                message="", committer="foo@example.com")
157
 
            basis.revision_id = 'result'
158
 
            repo.add_revision('result', rev, basis)
159
 
            repo.commit_write_group()
160
 
        except:
161
 
            repo.abort_write_group()
162
 
            raise
163
 
        # Set the basis state as the trees current state
164
 
        tree._write_inventory(basis)
165
 
        # This reads basis from the repo and puts it into the tree's local
166
 
        # cache, if it has one.
167
 
        tree.set_parent_ids(['basis'])
168
 
        paths = {}
169
 
        parents = set()
170
 
        for old, new, id, entry in delta:
171
 
            if None in (new, entry):
172
 
                continue
173
 
            paths[new] = (entry.file_id, entry.kind)
174
 
            parents.add(osutils.dirname(new))
175
 
        parents = osutils.minimum_path_selection(parents)
176
 
        parents.discard('')
177
 
        # Put place holders in the tree to permit adding the other entries.
178
 
        for pos, parent in enumerate(parents):
179
 
            if not tree.path2id(parent):
180
 
                # add a synthetic directory in the tree so we can can put the
181
 
                # tree0 entries in place for dirstate.
182
 
                tree.add([parent], ["id%d" % pos], ["directory"])
183
 
        if paths:
184
 
            # Many deltas may cause this mini-apply to fail, but we want to see what
185
 
            # the delta application code says, not the prep that we do to deal with 
186
 
            # limitations of dirstate's update_basis code.
187
 
            for path, (file_id, kind) in sorted(paths.items()):
188
 
                try:
189
 
                    tree.add([path], [file_id], [kind])
190
 
                except (KeyboardInterrupt, SystemExit):
191
 
                    raise
192
 
                except:
193
 
                    pass
194
 
    finally:
195
 
        tree.unlock()
196
 
    # Fresh lock, reads disk again.
197
 
    tree.lock_write()
198
 
    try:
199
 
        tree.update_basis_by_delta('result', delta)
200
 
    finally:
201
 
        tree.unlock()
202
 
    # reload tree - ensure we get what was written.
203
 
    tree = tree.bzrdir.open_workingtree()
204
 
    basis_tree = tree.basis_tree()
205
 
    basis_tree.lock_read()
206
 
    self.addCleanup(basis_tree.unlock)
207
 
    # Note, that if the tree does not have a local cache, the trick above of
208
 
    # setting the result as the basis, will come back to bite us. That said,
209
 
    # all the implementations in bzr do have a local cache.
210
 
    return basis_tree.inventory
211
 
 
212
 
 
213
 
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
214
 
    """Apply delta to basis and return the result.
215
 
    
216
 
    This inserts basis as a whole inventory and then uses
217
 
    add_inventory_by_delta to add delta.
218
 
 
219
 
    :param basis: An inventory to be used as the basis.
220
 
    :param delta: The inventory delta to apply:
221
 
    :return: An inventory resulting from the application.
222
 
    """
223
 
    format = self.format()
224
 
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
225
 
    repo = format.initialize(control)
226
 
    repo.lock_write()
227
 
    try:
228
 
        repo.start_write_group()
229
 
        try:
230
 
            rev = revision.Revision('basis', timestamp=0, timezone=None,
231
 
                message="", committer="foo@example.com")
232
 
            basis.revision_id = 'basis'
233
 
            create_texts_for_inv(repo, basis)
234
 
            repo.add_revision('basis', rev, basis)
235
 
            repo.commit_write_group()
236
 
        except:
237
 
            repo.abort_write_group()
238
 
            raise
239
 
    finally:
240
 
        repo.unlock()
241
 
    repo.lock_write()
242
 
    try:
243
 
        repo.start_write_group()
244
 
        try:
245
 
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
246
 
                'result', ['basis'])
247
 
        except:
248
 
            repo.abort_write_group()
249
 
            raise
250
 
        else:
251
 
            repo.commit_write_group()
252
 
    finally:
253
 
        repo.unlock()
254
 
    # Fresh lock, reads disk again.
255
 
    repo = repo.bzrdir.open_repository()
256
 
    repo.lock_read()
257
 
    self.addCleanup(repo.unlock)
258
 
    return repo.get_inventory('result')
259
 
 
260
 
 
261
 
class TestInventoryUpdates(TestCase):
262
 
 
263
 
    def test_creation_from_root_id(self):
264
 
        # iff a root id is passed to the constructor, a root directory is made
265
 
        inv = inventory.Inventory(root_id='tree-root')
266
 
        self.assertNotEqual(None, inv.root)
267
 
        self.assertEqual('tree-root', inv.root.file_id)
268
 
 
269
 
    def test_add_path_of_root(self):
270
 
        # if no root id is given at creation time, there is no root directory
271
 
        inv = inventory.Inventory(root_id=None)
272
 
        self.assertIs(None, inv.root)
273
 
        # add a root entry by adding its path
274
 
        ie = inv.add_path("", "directory", "my-root")
275
 
        ie.revision = 'test-rev'
276
 
        self.assertEqual("my-root", ie.file_id)
277
 
        self.assertIs(ie, inv.root)
278
 
 
279
 
    def test_add_path(self):
280
 
        inv = inventory.Inventory(root_id='tree_root')
281
 
        ie = inv.add_path('hello', 'file', 'hello-id')
282
 
        self.assertEqual('hello-id', ie.file_id)
283
 
        self.assertEqual('file', ie.kind)
284
 
 
285
 
    def test_copy(self):
286
 
        """Make sure copy() works and creates a deep copy."""
287
 
        inv = inventory.Inventory(root_id='some-tree-root')
288
 
        ie = inv.add_path('hello', 'file', 'hello-id')
289
 
        inv2 = inv.copy()
290
 
        inv.root.file_id = 'some-new-root'
291
 
        ie.name = 'file2'
292
 
        self.assertEqual('some-tree-root', inv2.root.file_id)
293
 
        self.assertEqual('hello', inv2['hello-id'].name)
294
 
 
295
 
    def test_copy_empty(self):
296
 
        """Make sure an empty inventory can be copied."""
297
 
        inv = inventory.Inventory(root_id=None)
298
 
        inv2 = inv.copy()
299
 
        self.assertIs(None, inv2.root)
300
 
 
301
 
    def test_copy_copies_root_revision(self):
302
 
        """Make sure the revision of the root gets copied."""
303
 
        inv = inventory.Inventory(root_id='someroot')
304
 
        inv.root.revision = 'therev'
305
 
        inv2 = inv.copy()
306
 
        self.assertEquals('someroot', inv2.root.file_id)
307
 
        self.assertEquals('therev', inv2.root.revision)
308
 
 
309
 
    def test_create_tree_reference(self):
310
 
        inv = inventory.Inventory('tree-root-123')
311
 
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
312
 
                              revision='rev', reference_revision='rev2'))
313
 
 
314
 
    def test_error_encoding(self):
315
 
        inv = inventory.Inventory('tree-root')
316
 
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
317
 
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
318
 
            InventoryFile('b-id', u'\u1234', 'tree-root'))
319
 
        self.assertContainsRe(str(e), r'\\u1234')
320
 
 
321
 
    def test_add_recursive(self):
322
 
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
323
 
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
324
 
        parent.children[child.file_id] = child
325
 
        inv = inventory.Inventory('tree-root')
326
 
        inv.add(parent)
327
 
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
328
 
 
329
 
 
330
 
 
331
 
class TestDeltaApplication(TestCaseWithTransport):
332
 
 
333
 
    def get_empty_inventory(self, reference_inv=None):
334
 
        """Get an empty inventory.
335
 
 
336
 
        Note that tests should not depend on the revision of the root for
337
 
        setting up test conditions, as it has to be flexible to accomodate non
338
 
        rich root repositories.
339
 
 
340
 
        :param reference_inv: If not None, get the revision for the root from
341
 
            this inventory. This is useful for dealing with older repositories
342
 
            that routinely discarded the root entry data. If None, the root's
343
 
            revision is set to 'basis'.
344
 
        """
345
 
        inv = inventory.Inventory()
346
 
        if reference_inv is not None:
347
 
            inv.root.revision = reference_inv.root.revision
348
 
        else:
349
 
            inv.root.revision = 'basis'
350
 
        return inv
351
 
 
352
 
    def test_empty_delta(self):
353
 
        inv = self.get_empty_inventory()
354
 
        delta = []
355
 
        inv = self.apply_delta(self, inv, delta)
356
 
        inv2 = self.get_empty_inventory(inv)
357
 
        self.assertEqual([], inv2._make_delta(inv))
358
 
 
359
 
    def test_None_file_id(self):
360
 
        inv = self.get_empty_inventory()
361
 
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
362
 
        dir1.revision = 'result'
363
 
        delta = [(None, u'dir1', None, dir1)]
364
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
365
 
            inv, delta)
366
 
 
367
 
    def test_unicode_file_id(self):
368
 
        inv = self.get_empty_inventory()
369
 
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
370
 
        dir1.revision = 'result'
371
 
        delta = [(None, u'dir1', dir1.file_id, dir1)]
372
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
373
 
            inv, delta)
374
 
 
375
 
    def test_repeated_file_id(self):
376
 
        inv = self.get_empty_inventory()
377
 
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
378
 
        file1.revision = 'result'
379
 
        file1.text_size = 0
380
 
        file1.text_sha1 = ""
381
 
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
382
 
        file2.revision = 'result'
383
 
        file2.text_size = 0
384
 
        file2.text_sha1 = ""
385
 
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
386
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
387
 
            inv, delta)
388
 
 
389
 
    def test_repeated_new_path(self):
390
 
        inv = self.get_empty_inventory()
391
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
392
 
        file1.revision = 'result'
393
 
        file1.text_size = 0
394
 
        file1.text_sha1 = ""
395
 
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
396
 
        file2.revision = 'result'
397
 
        file2.text_size = 0
398
 
        file2.text_sha1 = ""
399
 
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
400
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
401
 
            inv, delta)
402
 
 
403
 
    def test_repeated_old_path(self):
404
 
        inv = self.get_empty_inventory()
405
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
406
 
        file1.revision = 'result'
407
 
        file1.text_size = 0
408
 
        file1.text_sha1 = ""
409
 
        # We can't *create* a source inventory with the same path, but
410
 
        # a badly generated partial delta might claim the same source twice.
411
 
        # This would be buggy in two ways: the path is repeated in the delta,
412
 
        # And the path for one of the file ids doesn't match the source
413
 
        # location. Alternatively, we could have a repeated fileid, but that
414
 
        # is separately checked for.
415
 
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
416
 
        file2.revision = 'result'
417
 
        file2.text_size = 0
418
 
        file2.text_sha1 = ""
419
 
        inv.add(file1)
420
 
        inv.add(file2)
421
 
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
422
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
423
 
            inv, delta)
424
 
 
425
 
    def test_mismatched_id_entry_id(self):
426
 
        inv = self.get_empty_inventory()
427
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
428
 
        file1.revision = 'result'
429
 
        file1.text_size = 0
430
 
        file1.text_sha1 = ""
431
 
        delta = [(None, u'path', 'id', file1)]
432
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
433
 
            inv, delta)
434
 
 
435
 
    def test_mismatched_new_path_entry_None(self):
436
 
        inv = self.get_empty_inventory()
437
 
        delta = [(None, u'path', 'id', None)]
438
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
439
 
            inv, delta)
440
 
 
441
 
    def test_mismatched_new_path_None_entry(self):
442
 
        inv = self.get_empty_inventory()
443
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
444
 
        file1.revision = 'result'
445
 
        file1.text_size = 0
446
 
        file1.text_sha1 = ""
447
 
        delta = [(u"path", None, 'id1', file1)]
448
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
449
 
            inv, delta)
450
 
 
451
 
    def test_parent_is_not_directory(self):
452
 
        inv = self.get_empty_inventory()
453
 
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
454
 
        file1.revision = 'result'
455
 
        file1.text_size = 0
456
 
        file1.text_sha1 = ""
457
 
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
458
 
        file2.revision = 'result'
459
 
        file2.text_size = 0
460
 
        file2.text_sha1 = ""
461
 
        inv.add(file1)
462
 
        delta = [(None, u'path/path2', 'id2', file2)]
463
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
464
 
            inv, delta)
465
 
 
466
 
    def test_parent_is_missing(self):
467
 
        inv = self.get_empty_inventory()
468
 
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
469
 
        file2.revision = 'result'
470
 
        file2.text_size = 0
471
 
        file2.text_sha1 = ""
472
 
        delta = [(None, u'path/path2', 'id2', file2)]
473
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
474
 
            inv, delta)
475
 
 
476
 
    def test_new_parent_path_has_wrong_id(self):
477
 
        inv = self.get_empty_inventory()
478
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
479
 
        parent1.revision = 'result'
480
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
481
 
        parent2.revision = 'result'
482
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
483
 
        file1.revision = 'result'
484
 
        file1.text_size = 0
485
 
        file1.text_sha1 = ""
486
 
        inv.add(parent1)
487
 
        inv.add(parent2)
488
 
        # This delta claims that file1 is at dir/path, but actually its at
489
 
        # dir2/path if you follow the inventory parent structure.
490
 
        delta = [(None, u'dir/path', 'id', file1)]
491
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
492
 
            inv, delta)
493
 
 
494
 
    def test_old_parent_path_is_wrong(self):
495
 
        inv = self.get_empty_inventory()
496
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
497
 
        parent1.revision = 'result'
498
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
499
 
        parent2.revision = 'result'
500
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
501
 
        file1.revision = 'result'
502
 
        file1.text_size = 0
503
 
        file1.text_sha1 = ""
504
 
        inv.add(parent1)
505
 
        inv.add(parent2)
506
 
        inv.add(file1)
507
 
        # This delta claims that file1 was at dir/path, but actually it was at
508
 
        # dir2/path if you follow the inventory parent structure.
509
 
        delta = [(u'dir/path', None, 'id', None)]
510
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
511
 
            inv, delta)
512
 
 
513
 
    def test_old_parent_path_is_for_other_id(self):
514
 
        inv = self.get_empty_inventory()
515
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
516
 
        parent1.revision = 'result'
517
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
518
 
        parent2.revision = 'result'
519
 
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
520
 
        file1.revision = 'result'
521
 
        file1.text_size = 0
522
 
        file1.text_sha1 = ""
523
 
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
524
 
        file2.revision = 'result'
525
 
        file2.text_size = 0
526
 
        file2.text_sha1 = ""
527
 
        inv.add(parent1)
528
 
        inv.add(parent2)
529
 
        inv.add(file1)
530
 
        inv.add(file2)
531
 
        # This delta claims that file1 was at dir/path, but actually it was at
532
 
        # dir2/path if you follow the inventory parent structure. At dir/path
533
 
        # is another entry we should not delete.
534
 
        delta = [(u'dir/path', None, 'id', None)]
535
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
536
 
            inv, delta)
537
 
 
538
 
    def test_add_existing_id_new_path(self):
539
 
        inv = self.get_empty_inventory()
540
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
541
 
        parent1.revision = 'result'
542
 
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
543
 
        parent2.revision = 'result'
544
 
        inv.add(parent1)
545
 
        delta = [(None, u'dir2', 'p-1', parent2)]
546
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
547
 
            inv, delta)
548
 
 
549
 
    def test_add_new_id_existing_path(self):
550
 
        inv = self.get_empty_inventory()
551
 
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
552
 
        parent1.revision = 'result'
553
 
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
554
 
        parent2.revision = 'result'
555
 
        inv.add(parent1)
556
 
        delta = [(None, u'dir1', 'p-2', parent2)]
557
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
558
 
            inv, delta)
559
 
 
560
 
    def test_remove_dir_leaving_dangling_child(self):
561
 
        inv = self.get_empty_inventory()
562
 
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
563
 
        dir1.revision = 'result'
564
 
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
565
 
        dir2.revision = 'result'
566
 
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
567
 
        dir3.revision = 'result'
568
 
        inv.add(dir1)
569
 
        inv.add(dir2)
570
 
        inv.add(dir3)
571
 
        delta = [(u'dir1', None, 'p-1', None),
572
 
            (u'dir1/child2', None, 'p-3', None)]
573
 
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
574
 
            inv, delta)
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from cStringIO import StringIO
 
18
import os
 
19
import time
 
20
 
 
21
from bzrlib import errors, inventory, osutils
 
22
from bzrlib.branch import Branch
 
23
from bzrlib.diff import internal_diff
 
24
from bzrlib.inventory import (Inventory, ROOT_ID, InventoryFile,
 
25
    InventoryDirectory, InventoryEntry)
 
26
from bzrlib.osutils import (has_symlinks, rename, pathjoin, is_inside_any, 
 
27
    is_inside_or_parent_of_any)
 
28
from bzrlib.tests import TestCase, TestCaseWithTransport
 
29
from bzrlib.transform import TreeTransform
 
30
from bzrlib.uncommit import uncommit
575
31
 
576
32
 
577
33
class TestInventory(TestCase):
578
34
 
579
 
    def test_is_root(self):
580
 
        """Ensure our root-checking code is accurate."""
581
 
        inv = inventory.Inventory('TREE_ROOT')
582
 
        self.assertTrue(inv.is_root('TREE_ROOT'))
583
 
        self.assertFalse(inv.is_root('booga'))
584
 
        inv.root.file_id = 'booga'
585
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
586
 
        self.assertTrue(inv.is_root('booga'))
587
 
        # works properly even if no root is set
588
 
        inv.root = None
589
 
        self.assertFalse(inv.is_root('TREE_ROOT'))
590
 
        self.assertFalse(inv.is_root('booga'))
591
 
 
592
 
    def test_entries_for_empty_inventory(self):
593
 
        """Test that entries() will not fail for an empty inventory"""
594
 
        inv = Inventory(root_id=None)
595
 
        self.assertEqual([], inv.entries())
 
35
    def test_is_within(self):
 
36
 
 
37
        SRC_FOO_C = pathjoin('src', 'foo.c')
 
38
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
 
39
                         (['src'], SRC_FOO_C),
 
40
                         (['src'], 'src'),
 
41
                         ]:
 
42
            self.assert_(is_inside_any(dirs, fn))
 
43
            
 
44
        for dirs, fn in [(['src'], 'srccontrol'),
 
45
                         (['src'], 'srccontrol/foo')]:
 
46
            self.assertFalse(is_inside_any(dirs, fn))
 
47
 
 
48
    def test_is_within_or_parent(self):
 
49
        for dirs, fn in [(['src', 'doc'], 'src/foo.c'),
 
50
                         (['src'], 'src/foo.c'),
 
51
                         (['src/bar.c'], 'src'),
 
52
                         (['src/bar.c', 'bla/foo.c'], 'src'),
 
53
                         (['src'], 'src'),
 
54
                         ]:
 
55
            self.assert_(is_inside_or_parent_of_any(dirs, fn))
 
56
            
 
57
        for dirs, fn in [(['src'], 'srccontrol'),
 
58
                         (['srccontrol/foo.c'], 'src'),
 
59
                         (['src'], 'srccontrol/foo')]:
 
60
            self.assertFalse(is_inside_or_parent_of_any(dirs, fn))
 
61
 
 
62
    def test_ids(self):
 
63
        """Test detection of files within selected directories."""
 
64
        inv = Inventory()
 
65
        
 
66
        for args in [('src', 'directory', 'src-id'), 
 
67
                     ('doc', 'directory', 'doc-id'), 
 
68
                     ('src/hello.c', 'file'),
 
69
                     ('src/bye.c', 'file', 'bye-id'),
 
70
                     ('Makefile', 'file')]:
 
71
            inv.add_path(*args)
 
72
            
 
73
        self.assertEqual(inv.path2id('src'), 'src-id')
 
74
        self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
 
75
        
 
76
        self.assert_('src-id' in inv)
 
77
 
 
78
    def test_iter_entries(self):
 
79
        inv = Inventory()
 
80
        
 
81
        for args in [('src', 'directory', 'src-id'), 
 
82
                     ('doc', 'directory', 'doc-id'), 
 
83
                     ('src/hello.c', 'file', 'hello-id'),
 
84
                     ('src/bye.c', 'file', 'bye-id'),
 
85
                     ('Makefile', 'file', 'makefile-id')]:
 
86
            inv.add_path(*args)
 
87
 
 
88
        self.assertEqual([
 
89
            ('Makefile', 'makefile-id'),
 
90
            ('doc', 'doc-id'),
 
91
            ('src', 'src-id'),
 
92
            ('src/bye.c', 'bye-id'),
 
93
            ('src/hello.c', 'hello-id'),
 
94
            ], [(path, ie.file_id) for path, ie in inv.iter_entries()])
 
95
            
 
96
    def test_iter_entries_by_dir(self):
 
97
        inv = Inventory()
 
98
        
 
99
        for args in [('src', 'directory', 'src-id'), 
 
100
                     ('doc', 'directory', 'doc-id'), 
 
101
                     ('src/hello.c', 'file', 'hello-id'),
 
102
                     ('src/bye.c', 'file', 'bye-id'),
 
103
                     ('zz', 'file', 'zz-id'),
 
104
                     ('src/sub/', 'directory', 'sub-id'),
 
105
                     ('src/zz.c', 'file', 'zzc-id'),
 
106
                     ('src/sub/a', 'file', 'a-id'),
 
107
                     ('Makefile', 'file', 'makefile-id')]:
 
108
            inv.add_path(*args)
 
109
 
 
110
        self.assertEqual([
 
111
            ('Makefile', 'makefile-id'),
 
112
            ('doc', 'doc-id'),
 
113
            ('src', 'src-id'),
 
114
            ('zz', 'zz-id'),
 
115
            ('src/bye.c', 'bye-id'),
 
116
            ('src/hello.c', 'hello-id'),
 
117
            ('src/sub', 'sub-id'),
 
118
            ('src/zz.c', 'zzc-id'),
 
119
            ('src/sub/a', 'a-id'),
 
120
            ], [(path, ie.file_id) for path, ie in inv.iter_entries_by_dir()])
 
121
            
 
122
    def test_version(self):
 
123
        """Inventory remembers the text's version."""
 
124
        inv = Inventory()
 
125
        ie = inv.add_path('foo.txt', 'file')
 
126
        ## XXX
596
127
 
597
128
 
598
129
class TestInventoryEntry(TestCase):
611
142
 
612
143
    def test_dir_detect_changes(self):
613
144
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
145
        left.text_sha1 = 123
 
146
        left.executable = True
 
147
        left.symlink_target='foo'
614
148
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
 
149
        right.text_sha1 = 321
 
150
        right.symlink_target='bar'
615
151
        self.assertEqual((False, False), left.detect_changes(right))
616
152
        self.assertEqual((False, False), right.detect_changes(left))
617
153
 
631
167
 
632
168
    def test_symlink_detect_changes(self):
633
169
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
170
        left.text_sha1 = 123
 
171
        left.executable = True
634
172
        left.symlink_target='foo'
635
173
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
 
174
        right.text_sha1 = 321
636
175
        right.symlink_target='foo'
637
176
        self.assertEqual((False, False), left.detect_changes(right))
638
177
        self.assertEqual((False, False), right.detect_changes(left))
676
215
            osutils.normalized_filename = orig_normalized_filename
677
216
 
678
217
 
 
218
class TestEntryDiffing(TestCaseWithTransport):
 
219
 
 
220
    def setUp(self):
 
221
        super(TestEntryDiffing, self).setUp()
 
222
        self.wt = self.make_branch_and_tree('.')
 
223
        self.branch = self.wt.branch
 
224
        print >> open('file', 'wb'), 'foo'
 
225
        print >> open('binfile', 'wb'), 'foo'
 
226
        self.wt.add(['file'], ['fileid'])
 
227
        self.wt.add(['binfile'], ['binfileid'])
 
228
        if has_symlinks():
 
229
            os.symlink('target1', 'symlink')
 
230
            self.wt.add(['symlink'], ['linkid'])
 
231
        self.wt.commit('message_1', rev_id = '1')
 
232
        print >> open('file', 'wb'), 'bar'
 
233
        print >> open('binfile', 'wb'), 'x' * 1023 + '\x00'
 
234
        if has_symlinks():
 
235
            os.unlink('symlink')
 
236
            os.symlink('target2', 'symlink')
 
237
        self.tree_1 = self.branch.repository.revision_tree('1')
 
238
        self.inv_1 = self.branch.repository.get_inventory('1')
 
239
        self.file_1 = self.inv_1['fileid']
 
240
        self.file_1b = self.inv_1['binfileid']
 
241
        self.tree_2 = self.wt
 
242
        self.inv_2 = self.tree_2.read_working_inventory()
 
243
        self.file_2 = self.inv_2['fileid']
 
244
        self.file_2b = self.inv_2['binfileid']
 
245
        if has_symlinks():
 
246
            self.link_1 = self.inv_1['linkid']
 
247
            self.link_2 = self.inv_2['linkid']
 
248
 
 
249
    def test_file_diff_deleted(self):
 
250
        output = StringIO()
 
251
        self.file_1.diff(internal_diff, 
 
252
                          "old_label", self.tree_1,
 
253
                          "/dev/null", None, None,
 
254
                          output)
 
255
        self.assertEqual(output.getvalue(), "--- old_label\n"
 
256
                                            "+++ /dev/null\n"
 
257
                                            "@@ -1,1 +0,0 @@\n"
 
258
                                            "-foo\n"
 
259
                                            "\n")
 
260
 
 
261
    def test_file_diff_added(self):
 
262
        output = StringIO()
 
263
        self.file_1.diff(internal_diff, 
 
264
                          "new_label", self.tree_1,
 
265
                          "/dev/null", None, None,
 
266
                          output, reverse=True)
 
267
        self.assertEqual(output.getvalue(), "--- /dev/null\n"
 
268
                                            "+++ new_label\n"
 
269
                                            "@@ -0,0 +1,1 @@\n"
 
270
                                            "+foo\n"
 
271
                                            "\n")
 
272
 
 
273
    def test_file_diff_changed(self):
 
274
        output = StringIO()
 
275
        self.file_1.diff(internal_diff, 
 
276
                          "/dev/null", self.tree_1, 
 
277
                          "new_label", self.file_2, self.tree_2,
 
278
                          output)
 
279
        self.assertEqual(output.getvalue(), "--- /dev/null\n"
 
280
                                            "+++ new_label\n"
 
281
                                            "@@ -1,1 +1,1 @@\n"
 
282
                                            "-foo\n"
 
283
                                            "+bar\n"
 
284
                                            "\n")
 
285
        
 
286
    def test_file_diff_binary(self):
 
287
        output = StringIO()
 
288
        self.file_1.diff(internal_diff, 
 
289
                          "/dev/null", self.tree_1, 
 
290
                          "new_label", self.file_2b, self.tree_2,
 
291
                          output)
 
292
        self.assertEqual(output.getvalue(), 
 
293
                         "Binary files /dev/null and new_label differ\n")
 
294
    def test_link_diff_deleted(self):
 
295
        if not has_symlinks():
 
296
            return
 
297
        output = StringIO()
 
298
        self.link_1.diff(internal_diff, 
 
299
                          "old_label", self.tree_1,
 
300
                          "/dev/null", None, None,
 
301
                          output)
 
302
        self.assertEqual(output.getvalue(),
 
303
                         "=== target was 'target1'\n")
 
304
 
 
305
    def test_link_diff_added(self):
 
306
        if not has_symlinks():
 
307
            return
 
308
        output = StringIO()
 
309
        self.link_1.diff(internal_diff, 
 
310
                          "new_label", self.tree_1,
 
311
                          "/dev/null", None, None,
 
312
                          output, reverse=True)
 
313
        self.assertEqual(output.getvalue(),
 
314
                         "=== target is 'target1'\n")
 
315
 
 
316
    def test_link_diff_changed(self):
 
317
        if not has_symlinks():
 
318
            return
 
319
        output = StringIO()
 
320
        self.link_1.diff(internal_diff, 
 
321
                          "/dev/null", self.tree_1, 
 
322
                          "new_label", self.link_2, self.tree_2,
 
323
                          output)
 
324
        self.assertEqual(output.getvalue(),
 
325
                         "=== target changed 'target1' => 'target2'\n")
 
326
 
 
327
 
 
328
class TestSnapshot(TestCaseWithTransport):
 
329
 
 
330
    def setUp(self):
 
331
        # for full testing we'll need a branch
 
332
        # with a subdir to test parent changes.
 
333
        # and a file, link and dir under that.
 
334
        # but right now I only need one attribute
 
335
        # to change, and then test merge patterns
 
336
        # with fake parent entries.
 
337
        super(TestSnapshot, self).setUp()
 
338
        self.wt = self.make_branch_and_tree('.')
 
339
        self.branch = self.wt.branch
 
340
        self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
 
341
        self.wt.add(['subdir', 'subdir/file'],
 
342
                                       ['dirid', 'fileid'])
 
343
        if has_symlinks():
 
344
            pass
 
345
        self.wt.commit('message_1', rev_id = '1')
 
346
        self.tree_1 = self.branch.repository.revision_tree('1')
 
347
        self.inv_1 = self.branch.repository.get_inventory('1')
 
348
        self.file_1 = self.inv_1['fileid']
 
349
        self.file_active = self.wt.inventory['fileid']
 
350
        self.builder = self.branch.get_commit_builder([], timestamp=time.time(), revision_id='2')
 
351
 
 
352
    def test_snapshot_new_revision(self):
 
353
        # This tests that a simple commit with no parents makes a new
 
354
        # revision value in the inventory entry
 
355
        self.file_active.snapshot('2', 'subdir/file', {}, self.wt, self.builder)
 
356
        # expected outcome - file_1 has a revision id of '2', and we can get
 
357
        # its text of 'file contents' out of the weave.
 
358
        self.assertEqual(self.file_1.revision, '1')
 
359
        self.assertEqual(self.file_active.revision, '2')
 
360
        # this should be a separate test probably, but lets check it once..
 
361
        lines = self.branch.repository.weave_store.get_weave(
 
362
            'fileid', 
 
363
            self.branch.get_transaction()).get_lines('2')
 
364
        self.assertEqual(lines, ['contents of subdir/file\n'])
 
365
 
 
366
    def test_snapshot_unchanged(self):
 
367
        #This tests that a simple commit does not make a new entry for
 
368
        # an unchanged inventory entry
 
369
        self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
 
370
                                  self.wt, self.builder)
 
371
        self.assertEqual(self.file_1.revision, '1')
 
372
        self.assertEqual(self.file_active.revision, '1')
 
373
        vf = self.branch.repository.weave_store.get_weave(
 
374
            'fileid', 
 
375
            self.branch.repository.get_transaction())
 
376
        self.assertRaises(errors.RevisionNotPresent,
 
377
                          vf.get_lines,
 
378
                          '2')
 
379
 
 
380
    def test_snapshot_merge_identical_different_revid(self):
 
381
        # This tests that a commit with two identical parents, one of which has
 
382
        # a different revision id, results in a new revision id in the entry.
 
383
        # 1->other, commit a merge of other against 1, results in 2.
 
384
        other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
 
385
        other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
 
386
        other_ie.revision = '1'
 
387
        other_ie.text_sha1 = self.file_1.text_sha1
 
388
        other_ie.text_size = self.file_1.text_size
 
389
        self.assertEqual(self.file_1, other_ie)
 
390
        other_ie.revision = 'other'
 
391
        self.assertNotEqual(self.file_1, other_ie)
 
392
        versionfile = self.branch.repository.weave_store.get_weave(
 
393
            'fileid', self.branch.repository.get_transaction())
 
394
        versionfile.clone_text('other', '1', ['1'])
 
395
        self.file_active.snapshot('2', 'subdir/file', 
 
396
                                  {'1':self.file_1, 'other':other_ie},
 
397
                                  self.wt, self.builder)
 
398
        self.assertEqual(self.file_active.revision, '2')
 
399
 
 
400
    def test_snapshot_changed(self):
 
401
        # This tests that a commit with one different parent results in a new
 
402
        # revision id in the entry.
 
403
        self.file_active.name='newname'
 
404
        rename('subdir/file', 'subdir/newname')
 
405
        self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1}, 
 
406
                                  self.wt, self.builder)
 
407
        # expected outcome - file_1 has a revision id of '2'
 
408
        self.assertEqual(self.file_active.revision, '2')
 
409
 
 
410
 
 
411
class TestPreviousHeads(TestCaseWithTransport):
 
412
 
 
413
    def setUp(self):
 
414
        # we want several inventories, that respectively
 
415
        # give use the following scenarios:
 
416
        # A) fileid not in any inventory (A),
 
417
        # B) fileid present in one inventory (B) and (A,B)
 
418
        # C) fileid present in two inventories, and they
 
419
        #   are not mutual descendents (B, C)
 
420
        # D) fileid present in two inventories and one is
 
421
        #   a descendent of the other. (B, D)
 
422
        super(TestPreviousHeads, self).setUp()
 
423
        self.wt = self.make_branch_and_tree('.')
 
424
        self.branch = self.wt.branch
 
425
        self.build_tree(['file'])
 
426
        self.wt.commit('new branch', allow_pointless=True, rev_id='A')
 
427
        self.inv_A = self.branch.repository.get_inventory('A')
 
428
        self.wt.add(['file'], ['fileid'])
 
429
        self.wt.commit('add file', rev_id='B')
 
430
        self.inv_B = self.branch.repository.get_inventory('B')
 
431
        uncommit(self.branch, tree=self.wt)
 
432
        self.assertEqual(self.branch.revision_history(), ['A'])
 
433
        self.wt.commit('another add of file', rev_id='C')
 
434
        self.inv_C = self.branch.repository.get_inventory('C')
 
435
        self.wt.add_pending_merge('B')
 
436
        self.wt.commit('merge in B', rev_id='D')
 
437
        self.inv_D = self.branch.repository.get_inventory('D')
 
438
        self.file_active = self.wt.inventory['fileid']
 
439
        self.weave = self.branch.repository.weave_store.get_weave('fileid',
 
440
            self.branch.repository.get_transaction())
 
441
        
 
442
    def get_previous_heads(self, inventories):
 
443
        return self.file_active.find_previous_heads(
 
444
            inventories, 
 
445
            self.branch.repository.weave_store,
 
446
            self.branch.repository.get_transaction())
 
447
        
 
448
    def test_fileid_in_no_inventory(self):
 
449
        self.assertEqual({}, self.get_previous_heads([self.inv_A]))
 
450
 
 
451
    def test_fileid_in_one_inventory(self):
 
452
        self.assertEqual({'B':self.inv_B['fileid']},
 
453
                         self.get_previous_heads([self.inv_B]))
 
454
        self.assertEqual({'B':self.inv_B['fileid']},
 
455
                         self.get_previous_heads([self.inv_A, self.inv_B]))
 
456
        self.assertEqual({'B':self.inv_B['fileid']},
 
457
                         self.get_previous_heads([self.inv_B, self.inv_A]))
 
458
 
 
459
    def test_fileid_in_two_inventories_gives_both_entries(self):
 
460
        self.assertEqual({'B':self.inv_B['fileid'],
 
461
                          'C':self.inv_C['fileid']},
 
462
                          self.get_previous_heads([self.inv_B, self.inv_C]))
 
463
        self.assertEqual({'B':self.inv_B['fileid'],
 
464
                          'C':self.inv_C['fileid']},
 
465
                          self.get_previous_heads([self.inv_C, self.inv_B]))
 
466
 
 
467
    def test_fileid_in_two_inventories_already_merged_gives_head(self):
 
468
        self.assertEqual({'D':self.inv_D['fileid']},
 
469
                         self.get_previous_heads([self.inv_B, self.inv_D]))
 
470
        self.assertEqual({'D':self.inv_D['fileid']},
 
471
                         self.get_previous_heads([self.inv_D, self.inv_B]))
 
472
 
 
473
    # TODO: test two inventories with the same file revision 
 
474
 
 
475
 
679
476
class TestDescribeChanges(TestCase):
680
477
 
681
478
    def test_describe_change(self):
706
503
        # perhaps a bit questionable but seems like the most reasonable thing...
707
504
        self.assertChangeDescription('unchanged', None, None)
708
505
 
709
 
        # in this case it's both renamed and modified; show a rename and
 
506
        # in this case it's both renamed and modified; show a rename and 
710
507
        # modification:
711
508
        new_a.name = 'newfilename'
712
509
        self.assertChangeDescription('modified and renamed', old_a, new_a)
734
531
        self.assertEqual(expected_change, change)
735
532
 
736
533
 
737
 
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
738
 
 
739
 
    def get_chk_bytes(self):
740
 
        factory = groupcompress.make_pack_factory(True, True, 1)
741
 
        trans = self.get_transport('')
742
 
        return factory(trans)
743
 
 
744
 
    def read_bytes(self, chk_bytes, key):
745
 
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
746
 
        return stream.next().get_bytes_as("fulltext")
747
 
 
748
 
    def test_deserialise_gives_CHKInventory(self):
749
 
        inv = Inventory()
750
 
        inv.revision_id = "revid"
751
 
        inv.root.revision = "rootrev"
752
 
        chk_bytes = self.get_chk_bytes()
753
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
754
 
        bytes = ''.join(chk_inv.to_lines())
755
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
756
 
        self.assertEqual("revid", new_inv.revision_id)
757
 
        self.assertEqual("directory", new_inv.root.kind)
758
 
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
759
 
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
760
 
        self.assertEqual(inv.root.name, new_inv.root.name)
761
 
        self.assertEqual("rootrev", new_inv.root.revision)
762
 
        self.assertEqual('plain', new_inv._search_key_name)
763
 
 
764
 
    def test_deserialise_wrong_revid(self):
765
 
        inv = Inventory()
766
 
        inv.revision_id = "revid"
767
 
        inv.root.revision = "rootrev"
768
 
        chk_bytes = self.get_chk_bytes()
769
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
770
 
        bytes = ''.join(chk_inv.to_lines())
771
 
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
772
 
            bytes, ("revid2",))
773
 
 
774
 
    def test_captures_rev_root_byid(self):
775
 
        inv = Inventory()
776
 
        inv.revision_id = "foo"
777
 
        inv.root.revision = "bar"
778
 
        chk_bytes = self.get_chk_bytes()
779
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
780
 
        lines = chk_inv.to_lines()
781
 
        self.assertEqual([
782
 
            'chkinventory:\n',
783
 
            'revision_id: foo\n',
784
 
            'root_id: TREE_ROOT\n',
785
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
786
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
787
 
            ], lines)
788
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
789
 
        self.assertEqual('plain', chk_inv._search_key_name)
790
 
 
791
 
    def test_captures_parent_id_basename_index(self):
792
 
        inv = Inventory()
793
 
        inv.revision_id = "foo"
794
 
        inv.root.revision = "bar"
795
 
        chk_bytes = self.get_chk_bytes()
796
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
797
 
        lines = chk_inv.to_lines()
798
 
        self.assertEqual([
799
 
            'chkinventory:\n',
800
 
            'revision_id: foo\n',
801
 
            'root_id: TREE_ROOT\n',
802
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
803
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
804
 
            ], lines)
805
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
806
 
        self.assertEqual('plain', chk_inv._search_key_name)
807
 
 
808
 
    def test_captures_search_key_name(self):
809
 
        inv = Inventory()
810
 
        inv.revision_id = "foo"
811
 
        inv.root.revision = "bar"
812
 
        chk_bytes = self.get_chk_bytes()
813
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
814
 
                                              search_key_name='hash-16-way')
815
 
        lines = chk_inv.to_lines()
816
 
        self.assertEqual([
817
 
            'chkinventory:\n',
818
 
            'search_key_name: hash-16-way\n',
819
 
            'root_id: TREE_ROOT\n',
820
 
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
821
 
            'revision_id: foo\n',
822
 
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
823
 
            ], lines)
824
 
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
825
 
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
826
 
 
827
 
    def test_directory_children_on_demand(self):
828
 
        inv = Inventory()
829
 
        inv.revision_id = "revid"
830
 
        inv.root.revision = "rootrev"
831
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
832
 
        inv["fileid"].revision = "filerev"
833
 
        inv["fileid"].executable = True
834
 
        inv["fileid"].text_sha1 = "ffff"
835
 
        inv["fileid"].text_size = 1
836
 
        chk_bytes = self.get_chk_bytes()
837
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
838
 
        bytes = ''.join(chk_inv.to_lines())
839
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
840
 
        root_entry = new_inv[inv.root.file_id]
841
 
        self.assertEqual(None, root_entry._children)
842
 
        self.assertEqual(['file'], root_entry.children.keys())
843
 
        file_direct = new_inv["fileid"]
844
 
        file_found = root_entry.children['file']
845
 
        self.assertEqual(file_direct.kind, file_found.kind)
846
 
        self.assertEqual(file_direct.file_id, file_found.file_id)
847
 
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
848
 
        self.assertEqual(file_direct.name, file_found.name)
849
 
        self.assertEqual(file_direct.revision, file_found.revision)
850
 
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
851
 
        self.assertEqual(file_direct.text_size, file_found.text_size)
852
 
        self.assertEqual(file_direct.executable, file_found.executable)
853
 
 
854
 
    def test_from_inventory_maximum_size(self):
855
 
        # from_inventory supports the maximum_size parameter.
856
 
        inv = Inventory()
857
 
        inv.revision_id = "revid"
858
 
        inv.root.revision = "rootrev"
859
 
        chk_bytes = self.get_chk_bytes()
860
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
861
 
        chk_inv.id_to_entry._ensure_root()
862
 
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
863
 
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
864
 
        p_id_basename = chk_inv.parent_id_basename_to_file_id
865
 
        p_id_basename._ensure_root()
866
 
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
867
 
        self.assertEqual(2, p_id_basename._root_node._key_width)
868
 
 
869
 
    def test___iter__(self):
870
 
        inv = Inventory()
871
 
        inv.revision_id = "revid"
872
 
        inv.root.revision = "rootrev"
873
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
874
 
        inv["fileid"].revision = "filerev"
875
 
        inv["fileid"].executable = True
876
 
        inv["fileid"].text_sha1 = "ffff"
877
 
        inv["fileid"].text_size = 1
878
 
        chk_bytes = self.get_chk_bytes()
879
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
880
 
        bytes = ''.join(chk_inv.to_lines())
881
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
882
 
        fileids = list(new_inv.__iter__())
883
 
        fileids.sort()
884
 
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
885
 
 
886
 
    def test__len__(self):
887
 
        inv = Inventory()
888
 
        inv.revision_id = "revid"
889
 
        inv.root.revision = "rootrev"
890
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
891
 
        inv["fileid"].revision = "filerev"
892
 
        inv["fileid"].executable = True
893
 
        inv["fileid"].text_sha1 = "ffff"
894
 
        inv["fileid"].text_size = 1
895
 
        chk_bytes = self.get_chk_bytes()
896
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
897
 
        self.assertEqual(2, len(chk_inv))
898
 
 
899
 
    def test___getitem__(self):
900
 
        inv = Inventory()
901
 
        inv.revision_id = "revid"
902
 
        inv.root.revision = "rootrev"
903
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
904
 
        inv["fileid"].revision = "filerev"
905
 
        inv["fileid"].executable = True
906
 
        inv["fileid"].text_sha1 = "ffff"
907
 
        inv["fileid"].text_size = 1
908
 
        chk_bytes = self.get_chk_bytes()
909
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
910
 
        bytes = ''.join(chk_inv.to_lines())
911
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
912
 
        root_entry = new_inv[inv.root.file_id]
913
 
        file_entry = new_inv["fileid"]
914
 
        self.assertEqual("directory", root_entry.kind)
915
 
        self.assertEqual(inv.root.file_id, root_entry.file_id)
916
 
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
917
 
        self.assertEqual(inv.root.name, root_entry.name)
918
 
        self.assertEqual("rootrev", root_entry.revision)
919
 
        self.assertEqual("file", file_entry.kind)
920
 
        self.assertEqual("fileid", file_entry.file_id)
921
 
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
922
 
        self.assertEqual("file", file_entry.name)
923
 
        self.assertEqual("filerev", file_entry.revision)
924
 
        self.assertEqual("ffff", file_entry.text_sha1)
925
 
        self.assertEqual(1, file_entry.text_size)
926
 
        self.assertEqual(True, file_entry.executable)
927
 
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
928
 
 
929
 
    def test_has_id_true(self):
930
 
        inv = Inventory()
931
 
        inv.revision_id = "revid"
932
 
        inv.root.revision = "rootrev"
933
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
934
 
        inv["fileid"].revision = "filerev"
935
 
        inv["fileid"].executable = True
936
 
        inv["fileid"].text_sha1 = "ffff"
937
 
        inv["fileid"].text_size = 1
938
 
        chk_bytes = self.get_chk_bytes()
939
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
940
 
        self.assertTrue(chk_inv.has_id('fileid'))
941
 
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
942
 
 
943
 
    def test_has_id_not(self):
944
 
        inv = Inventory()
945
 
        inv.revision_id = "revid"
946
 
        inv.root.revision = "rootrev"
947
 
        chk_bytes = self.get_chk_bytes()
948
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
949
 
        self.assertFalse(chk_inv.has_id('fileid'))
950
 
 
951
 
    def test_id2path(self):
952
 
        inv = Inventory()
953
 
        inv.revision_id = "revid"
954
 
        inv.root.revision = "rootrev"
955
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
956
 
        fileentry = InventoryFile("fileid", "file", "dirid")
957
 
        inv.add(direntry)
958
 
        inv.add(fileentry)
959
 
        inv["fileid"].revision = "filerev"
960
 
        inv["fileid"].executable = True
961
 
        inv["fileid"].text_sha1 = "ffff"
962
 
        inv["fileid"].text_size = 1
963
 
        inv["dirid"].revision = "filerev"
964
 
        chk_bytes = self.get_chk_bytes()
965
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
966
 
        bytes = ''.join(chk_inv.to_lines())
967
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
968
 
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
969
 
        self.assertEqual('dir', new_inv.id2path('dirid'))
970
 
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
971
 
 
972
 
    def test_path2id(self):
973
 
        inv = Inventory()
974
 
        inv.revision_id = "revid"
975
 
        inv.root.revision = "rootrev"
976
 
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
977
 
        fileentry = InventoryFile("fileid", "file", "dirid")
978
 
        inv.add(direntry)
979
 
        inv.add(fileentry)
980
 
        inv["fileid"].revision = "filerev"
981
 
        inv["fileid"].executable = True
982
 
        inv["fileid"].text_sha1 = "ffff"
983
 
        inv["fileid"].text_size = 1
984
 
        inv["dirid"].revision = "filerev"
985
 
        chk_bytes = self.get_chk_bytes()
986
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
987
 
        bytes = ''.join(chk_inv.to_lines())
988
 
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
989
 
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
990
 
        self.assertEqual('dirid', new_inv.path2id('dir'))
991
 
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
992
 
 
993
 
    def test_create_by_apply_delta_sets_root(self):
994
 
        inv = Inventory()
995
 
        inv.revision_id = "revid"
996
 
        chk_bytes = self.get_chk_bytes()
997
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
998
 
        inv.add_path("", "directory", "myrootid", None)
999
 
        inv.revision_id = "expectedid"
1000
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1001
 
        delta = [("", None, base_inv.root.file_id, None),
1002
 
            (None, "",  "myrootid", inv.root)]
1003
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1004
 
        self.assertEquals(reference_inv.root, new_inv.root)
1005
 
 
1006
 
    def test_create_by_apply_delta_empty_add_child(self):
1007
 
        inv = Inventory()
1008
 
        inv.revision_id = "revid"
1009
 
        inv.root.revision = "rootrev"
1010
 
        chk_bytes = self.get_chk_bytes()
1011
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1012
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1013
 
        a_entry.revision = "filerev"
1014
 
        a_entry.executable = True
1015
 
        a_entry.text_sha1 = "ffff"
1016
 
        a_entry.text_size = 1
1017
 
        inv.add(a_entry)
1018
 
        inv.revision_id = "expectedid"
1019
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1020
 
        delta = [(None, "A",  "A-id", a_entry)]
1021
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1022
 
        # new_inv should be the same as reference_inv.
1023
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1024
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1025
 
        reference_inv.id_to_entry._ensure_root()
1026
 
        new_inv.id_to_entry._ensure_root()
1027
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1028
 
            new_inv.id_to_entry._root_node._key)
1029
 
 
1030
 
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
1031
 
        inv = Inventory()
1032
 
        inv.revision_id = "revid"
1033
 
        inv.root.revision = "rootrev"
1034
 
        chk_bytes = self.get_chk_bytes()
1035
 
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
1036
 
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
1037
 
        a_entry.revision = "filerev"
1038
 
        a_entry.executable = True
1039
 
        a_entry.text_sha1 = "ffff"
1040
 
        a_entry.text_size = 1
1041
 
        inv.add(a_entry)
1042
 
        inv.revision_id = "expectedid"
1043
 
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
1044
 
        delta = [(None, "A",  "A-id", a_entry)]
1045
 
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
1046
 
        reference_inv.id_to_entry._ensure_root()
1047
 
        reference_inv.parent_id_basename_to_file_id._ensure_root()
1048
 
        new_inv.id_to_entry._ensure_root()
1049
 
        new_inv.parent_id_basename_to_file_id._ensure_root()
1050
 
        # new_inv should be the same as reference_inv.
1051
 
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
1052
 
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
1053
 
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
1054
 
            new_inv.id_to_entry._root_node._key)
1055
 
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
1056
 
            new_inv.parent_id_basename_to_file_id._root_node._key)
1057
 
 
1058
 
    def test_iter_changes(self):
1059
 
        # Low level bootstrapping smoke test; comprehensive generic tests via
1060
 
        # InterTree are coming.
1061
 
        inv = Inventory()
1062
 
        inv.revision_id = "revid"
1063
 
        inv.root.revision = "rootrev"
1064
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1065
 
        inv["fileid"].revision = "filerev"
1066
 
        inv["fileid"].executable = True
1067
 
        inv["fileid"].text_sha1 = "ffff"
1068
 
        inv["fileid"].text_size = 1
1069
 
        inv2 = Inventory()
1070
 
        inv2.revision_id = "revid2"
1071
 
        inv2.root.revision = "rootrev"
1072
 
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
1073
 
        inv2["fileid"].revision = "filerev2"
1074
 
        inv2["fileid"].executable = False
1075
 
        inv2["fileid"].text_sha1 = "bbbb"
1076
 
        inv2["fileid"].text_size = 2
1077
 
        # get fresh objects.
1078
 
        chk_bytes = self.get_chk_bytes()
1079
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
1080
 
        bytes = ''.join(chk_inv.to_lines())
1081
 
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1082
 
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
1083
 
        bytes = ''.join(chk_inv2.to_lines())
1084
 
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
1085
 
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
1086
 
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
1087
 
            (False, True))],
1088
 
            list(inv_1.iter_changes(inv_2)))
1089
 
 
1090
 
    def test_parent_id_basename_to_file_id_index_enabled(self):
1091
 
        inv = Inventory()
1092
 
        inv.revision_id = "revid"
1093
 
        inv.root.revision = "rootrev"
1094
 
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
1095
 
        inv["fileid"].revision = "filerev"
1096
 
        inv["fileid"].executable = True
1097
 
        inv["fileid"].text_sha1 = "ffff"
1098
 
        inv["fileid"].text_size = 1
1099
 
        # get fresh objects.
1100
 
        chk_bytes = self.get_chk_bytes()
1101
 
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
1102
 
        bytes = ''.join(tmp_inv.to_lines())
1103
 
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1104
 
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
1105
 
        self.assertEqual(
1106
 
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
1107
 
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
1108
 
 
1109
 
    def test_file_entry_to_bytes(self):
1110
 
        inv = CHKInventory(None)
1111
 
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
1112
 
        ie.executable = True
1113
 
        ie.revision = 'file-rev-id'
1114
 
        ie.text_sha1 = 'abcdefgh'
1115
 
        ie.text_size = 100
1116
 
        bytes = inv._entry_to_bytes(ie)
1117
 
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
1118
 
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
1119
 
        ie2 = inv._bytes_to_entry(bytes)
1120
 
        self.assertEqual(ie, ie2)
1121
 
        self.assertIsInstance(ie2.name, unicode)
1122
 
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
1123
 
                         inv._bytes_to_utf8name_key(bytes))
1124
 
 
1125
 
    def test_file2_entry_to_bytes(self):
1126
 
        inv = CHKInventory(None)
1127
 
        # \u30a9 == 'omega'
1128
 
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
1129
 
        ie.executable = False
1130
 
        ie.revision = 'file-rev-id'
1131
 
        ie.text_sha1 = '123456'
1132
 
        ie.text_size = 25
1133
 
        bytes = inv._entry_to_bytes(ie)
1134
 
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
1135
 
                         'file-rev-id\n123456\n25\nN', bytes)
1136
 
        ie2 = inv._bytes_to_entry(bytes)
1137
 
        self.assertEqual(ie, ie2)
1138
 
        self.assertIsInstance(ie2.name, unicode)
1139
 
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
1140
 
                         inv._bytes_to_utf8name_key(bytes))
1141
 
 
1142
 
    def test_dir_entry_to_bytes(self):
1143
 
        inv = CHKInventory(None)
1144
 
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
1145
 
        ie.revision = 'dir-rev-id'
1146
 
        bytes = inv._entry_to_bytes(ie)
1147
 
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
1148
 
        ie2 = inv._bytes_to_entry(bytes)
1149
 
        self.assertEqual(ie, ie2)
1150
 
        self.assertIsInstance(ie2.name, unicode)
1151
 
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
1152
 
                         inv._bytes_to_utf8name_key(bytes))
1153
 
 
1154
 
    def test_dir2_entry_to_bytes(self):
1155
 
        inv = CHKInventory(None)
1156
 
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
1157
 
                                          None)
1158
 
        ie.revision = 'dir-rev-id'
1159
 
        bytes = inv._entry_to_bytes(ie)
1160
 
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
1161
 
                         'dir-rev-id', bytes)
1162
 
        ie2 = inv._bytes_to_entry(bytes)
1163
 
        self.assertEqual(ie, ie2)
1164
 
        self.assertIsInstance(ie2.name, unicode)
1165
 
        self.assertIs(ie2.parent_id, None)
1166
 
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
1167
 
                         inv._bytes_to_utf8name_key(bytes))
1168
 
 
1169
 
    def test_symlink_entry_to_bytes(self):
1170
 
        inv = CHKInventory(None)
1171
 
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
1172
 
        ie.revision = 'link-rev-id'
1173
 
        ie.symlink_target = u'target/path'
1174
 
        bytes = inv._entry_to_bytes(ie)
1175
 
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
1176
 
                         'link-rev-id\ntarget/path', bytes)
1177
 
        ie2 = inv._bytes_to_entry(bytes)
1178
 
        self.assertEqual(ie, ie2)
1179
 
        self.assertIsInstance(ie2.name, unicode)
1180
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1181
 
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
1182
 
                         inv._bytes_to_utf8name_key(bytes))
1183
 
 
1184
 
    def test_symlink2_entry_to_bytes(self):
1185
 
        inv = CHKInventory(None)
1186
 
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
1187
 
        ie.revision = 'link-rev-id'
1188
 
        ie.symlink_target = u'target/\u03a9path'
1189
 
        bytes = inv._entry_to_bytes(ie)
1190
 
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
1191
 
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
1192
 
        ie2 = inv._bytes_to_entry(bytes)
1193
 
        self.assertEqual(ie, ie2)
1194
 
        self.assertIsInstance(ie2.name, unicode)
1195
 
        self.assertIsInstance(ie2.symlink_target, unicode)
1196
 
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
1197
 
                         inv._bytes_to_utf8name_key(bytes))
1198
 
 
1199
 
    def test_tree_reference_entry_to_bytes(self):
1200
 
        inv = CHKInventory(None)
1201
 
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
1202
 
                                     'parent-id')
1203
 
        ie.revision = 'tree-rev-id'
1204
 
        ie.reference_revision = 'ref-rev-id'
1205
 
        bytes = inv._entry_to_bytes(ie)
1206
 
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
1207
 
                         'tree-rev-id\nref-rev-id', bytes)
1208
 
        ie2 = inv._bytes_to_entry(bytes)
1209
 
        self.assertEqual(ie, ie2)
1210
 
        self.assertIsInstance(ie2.name, unicode)
1211
 
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
1212
 
                         inv._bytes_to_utf8name_key(bytes))
1213
 
 
1214
 
 
1215
 
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
1216
 
 
1217
 
    def get_chk_bytes(self):
1218
 
        factory = groupcompress.make_pack_factory(True, True, 1)
1219
 
        trans = self.get_transport('')
1220
 
        return factory(trans)
1221
 
 
1222
 
    def make_dir(self, inv, name, parent_id):
1223
 
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
1224
 
 
1225
 
    def make_file(self, inv, name, parent_id, content='content\n'):
1226
 
        ie = inv.make_entry('file', name, parent_id, name + '-id')
1227
 
        ie.text_sha1 = osutils.sha_string(content)
1228
 
        ie.text_size = len(content)
1229
 
        inv.add(ie)
1230
 
 
1231
 
    def make_simple_inventory(self):
1232
 
        inv = Inventory('TREE_ROOT')
1233
 
        inv.revision_id = "revid"
1234
 
        inv.root.revision = "rootrev"
1235
 
        # /                 TREE_ROOT
1236
 
        # dir1/             dir1-id
1237
 
        #   sub-file1       sub-file1-id
1238
 
        #   sub-file2       sub-file2-id
1239
 
        #   sub-dir1/       sub-dir1-id
1240
 
        #     subsub-file1  subsub-file1-id
1241
 
        # dir2/             dir2-id
1242
 
        #   sub2-file1      sub2-file1-id
1243
 
        # top               top-id
1244
 
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
1245
 
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
1246
 
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
1247
 
        self.make_file(inv, 'top', 'TREE_ROOT')
1248
 
        self.make_file(inv, 'sub-file1', 'dir1-id')
1249
 
        self.make_file(inv, 'sub-file2', 'dir1-id')
1250
 
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
1251
 
        self.make_file(inv, 'sub2-file1', 'dir2-id')
1252
 
        chk_bytes = self.get_chk_bytes()
1253
 
        #  use a small maximum_size to force internal paging structures
1254
 
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
1255
 
                        maximum_size=100,
1256
 
                        search_key_name='hash-255-way')
1257
 
        bytes = ''.join(chk_inv.to_lines())
1258
 
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
1259
 
 
1260
 
    def assert_Getitems(self, expected_fileids, inv, file_ids):
1261
 
        self.assertEqual(sorted(expected_fileids),
1262
 
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
1263
 
 
1264
 
    def assertExpand(self, all_ids, inv, file_ids):
1265
 
        (val_all_ids,
1266
 
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
1267
 
        self.assertEqual(set(all_ids), val_all_ids)
1268
 
        entries = inv._getitems(val_all_ids)
1269
 
        expected_children = {}
1270
 
        for entry in entries:
1271
 
            s = expected_children.setdefault(entry.parent_id, [])
1272
 
            s.append(entry.file_id)
1273
 
        val_children = dict((k, sorted(v)) for k, v
1274
 
                            in val_children.iteritems())
1275
 
        expected_children = dict((k, sorted(v)) for k, v
1276
 
                            in expected_children.iteritems())
1277
 
        self.assertEqual(expected_children, val_children)
1278
 
 
1279
 
    def test_make_simple_inventory(self):
1280
 
        inv = self.make_simple_inventory()
1281
 
        layout = []
1282
 
        for path, entry in inv.iter_entries_by_dir():
1283
 
            layout.append((path, entry.file_id))
1284
 
        self.assertEqual([
1285
 
            ('', 'TREE_ROOT'),
1286
 
            ('dir1', 'dir1-id'),
1287
 
            ('dir2', 'dir2-id'),
1288
 
            ('top', 'top-id'),
1289
 
            ('dir1/sub-dir1', 'sub-dir1-id'),
1290
 
            ('dir1/sub-file1', 'sub-file1-id'),
1291
 
            ('dir1/sub-file2', 'sub-file2-id'),
1292
 
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
1293
 
            ('dir2/sub2-file1', 'sub2-file1-id'),
1294
 
            ], layout)
1295
 
 
1296
 
    def test__getitems(self):
1297
 
        inv = self.make_simple_inventory()
1298
 
        # Reading from disk
1299
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1300
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1301
 
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
1302
 
        # From cache
1303
 
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
1304
 
        # Mixed
1305
 
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
1306
 
                             ['dir1-id', 'sub-file2-id'])
1307
 
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
1308
 
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
1309
 
 
1310
 
    def test_single_file(self):
1311
 
        inv = self.make_simple_inventory()
1312
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1313
 
 
1314
 
    def test_get_all_parents(self):
1315
 
        inv = self.make_simple_inventory()
1316
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1317
 
                           'subsub-file1-id',
1318
 
                          ], inv, ['subsub-file1-id'])
1319
 
 
1320
 
    def test_get_children(self):
1321
 
        inv = self.make_simple_inventory()
1322
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1323
 
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
1324
 
                          ], inv, ['dir1-id'])
1325
 
 
1326
 
    def test_from_root(self):
1327
 
        inv = self.make_simple_inventory()
1328
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
1329
 
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
1330
 
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
1331
 
 
1332
 
    def test_top_level_file(self):
1333
 
        inv = self.make_simple_inventory()
1334
 
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
1335
 
 
1336
 
    def test_subsub_file(self):
1337
 
        inv = self.make_simple_inventory()
1338
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
1339
 
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
1340
 
 
1341
 
    def test_sub_and_root(self):
1342
 
        inv = self.make_simple_inventory()
1343
 
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
1344
 
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])
 
534
class TestRevert(TestCaseWithTransport):
 
535
 
 
536
    def test_dangling_id(self):
 
537
        wt = self.make_branch_and_tree('b1')
 
538
        self.assertEqual(len(wt.inventory), 1)
 
539
        open('b1/a', 'wb').write('a test\n')
 
540
        wt.add('a')
 
541
        self.assertEqual(len(wt.inventory), 2)
 
542
        os.unlink('b1/a')
 
543
        wt.revert([])
 
544
        self.assertEqual(len(wt.inventory), 1)