~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2010-11-09 18:19:47 UTC
  • mfrom: (5524.2.1 noshfolder)
  • Revision ID: pqm@pqm.ubuntu.com-20101109181947-h26505clmkdhh2uz
(GaryvdM) Exclude SHFOLDER.dll from py2exe builds,
        as it breaks subvertpy.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
 
 
1
# Copyright (C) 2005-2010 Canonical Ltd
 
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
5
5
# the Free Software Foundation; either version 2 of the License, or
6
6
# (at your option) any later version.
7
 
 
 
7
#
8
8
# This program is distributed in the hope that it will be useful,
9
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
11
# GNU General Public License for more details.
12
 
 
 
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
from cStringIO import StringIO
18
 
import os
19
 
 
20
 
from bzrlib.branch import Branch
21
 
from bzrlib.clone import copy_branch
22
 
import bzrlib.errors as errors
23
 
from bzrlib.diff import internal_diff
24
 
from bzrlib.inventory import Inventory, ROOT_ID
25
 
import bzrlib.inventory as inventory
26
 
from bzrlib.osutils import has_symlinks, rename
27
 
from bzrlib.tests import TestCase, TestCaseInTempDir
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
from bzrlib import (
 
19
    chk_map,
 
20
    groupcompress,
 
21
    errors,
 
22
    inventory,
 
23
    osutils,
 
24
    repository,
 
25
    revision,
 
26
    tests,
 
27
    )
 
28
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
 
29
    InventoryDirectory, InventoryEntry, TreeReference)
 
30
from bzrlib.tests import (
 
31
    TestCase,
 
32
    TestCaseWithTransport,
 
33
    condition_isinstance,
 
34
    multiply_tests,
 
35
    split_suite_by_condition,
 
36
    )
 
37
from bzrlib.tests.per_workingtree import workingtree_formats
 
38
 
 
39
 
 
40
def load_tests(standard_tests, module, loader):
 
41
    """Parameterise some inventory tests."""
 
42
    to_adapt, result = split_suite_by_condition(standard_tests,
 
43
        condition_isinstance(TestDeltaApplication))
 
44
    scenarios = [
 
45
        ('Inventory', {'apply_delta':apply_inventory_Inventory}),
 
46
        ]
 
47
    # Working tree basis delta application
 
48
    # Repository add_inv_by_delta.
 
49
    # Reduce form of the per_repository test logic - that logic needs to be
 
50
    # be able to get /just/ repositories whereas these tests are fine with
 
51
    # just creating trees.
 
52
    formats = set()
 
53
    for _, format in repository.format_registry.iteritems():
 
54
        scenarios.append((str(format.__name__), {
 
55
            'apply_delta':apply_inventory_Repository_add_inventory_by_delta,
 
56
            'format':format}))
 
57
    for format in workingtree_formats():
 
58
        scenarios.append(
 
59
            (str(format.__class__.__name__) + ".update_basis_by_delta", {
 
60
            'apply_delta':apply_inventory_WT_basis,
 
61
            'format':format}))
 
62
        scenarios.append(
 
63
            (str(format.__class__.__name__) + ".apply_inventory_delta", {
 
64
            'apply_delta':apply_inventory_WT,
 
65
            'format':format}))
 
66
    return multiply_tests(to_adapt, scenarios, result)
 
67
 
 
68
 
 
69
def create_texts_for_inv(repo, inv):
 
70
    for path, ie in inv.iter_entries():
 
71
        if ie.text_size:
 
72
            lines = ['a' * ie.text_size]
 
73
        else:
 
74
            lines = []
 
75
        repo.texts.add_lines((ie.file_id, ie.revision), [], lines)
 
76
    
 
77
def apply_inventory_Inventory(self, basis, delta):
 
78
    """Apply delta to basis and return the result.
 
79
    
 
80
    :param basis: An inventory to be used as the basis.
 
81
    :param delta: The inventory delta to apply:
 
82
    :return: An inventory resulting from the application.
 
83
    """
 
84
    basis.apply_delta(delta)
 
85
    return basis
 
86
 
 
87
 
 
88
def apply_inventory_WT(self, basis, delta):
 
89
    """Apply delta to basis and return the result.
 
90
 
 
91
    This sets the tree state to be basis, and then calls apply_inventory_delta.
 
92
    
 
93
    :param basis: An inventory to be used as the basis.
 
94
    :param delta: The inventory delta to apply:
 
95
    :return: An inventory resulting from the application.
 
96
    """
 
97
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
98
    control.create_repository()
 
99
    control.create_branch()
 
100
    tree = self.format.initialize(control)
 
101
    tree.lock_write()
 
102
    try:
 
103
        tree._write_inventory(basis)
 
104
    finally:
 
105
        tree.unlock()
 
106
    # Fresh object, reads disk again.
 
107
    tree = tree.bzrdir.open_workingtree()
 
108
    tree.lock_write()
 
109
    try:
 
110
        tree.apply_inventory_delta(delta)
 
111
    finally:
 
112
        tree.unlock()
 
113
    # reload tree - ensure we get what was written.
 
114
    tree = tree.bzrdir.open_workingtree()
 
115
    tree.lock_read()
 
116
    self.addCleanup(tree.unlock)
 
117
    # One could add 'tree._validate' here but that would cause 'early' failues 
 
118
    # as far as higher level code is concerned. Possibly adding an
 
119
    # expect_fail parameter to this function and if that is False then do a
 
120
    # validate call.
 
121
    return tree.inventory
 
122
 
 
123
 
 
124
def apply_inventory_WT_basis(self, basis, delta):
 
125
    """Apply delta to basis and return the result.
 
126
 
 
127
    This sets the parent and then calls update_basis_by_delta.
 
128
    It also puts the basis in the repository under both 'basis' and 'result' to
 
129
    allow safety checks made by the WT to succeed, and finally ensures that all
 
130
    items in the delta with a new path are present in the WT before calling
 
131
    update_basis_by_delta.
 
132
    
 
133
    :param basis: An inventory to be used as the basis.
 
134
    :param delta: The inventory delta to apply:
 
135
    :return: An inventory resulting from the application.
 
136
    """
 
137
    control = self.make_bzrdir('tree', format=self.format._matchingbzrdir)
 
138
    control.create_repository()
 
139
    control.create_branch()
 
140
    tree = self.format.initialize(control)
 
141
    tree.lock_write()
 
142
    try:
 
143
        repo = tree.branch.repository
 
144
        repo.start_write_group()
 
145
        try:
 
146
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
147
                message="", committer="foo@example.com")
 
148
            basis.revision_id = 'basis'
 
149
            create_texts_for_inv(tree.branch.repository, basis)
 
150
            repo.add_revision('basis', rev, basis)
 
151
            # Add a revision for the result, with the basis content - 
 
152
            # update_basis_by_delta doesn't check that the delta results in
 
153
            # result, and we want inconsistent deltas to get called on the
 
154
            # tree, or else the code isn't actually checked.
 
155
            rev = revision.Revision('result', timestamp=0, timezone=None,
 
156
                message="", committer="foo@example.com")
 
157
            basis.revision_id = 'result'
 
158
            repo.add_revision('result', rev, basis)
 
159
            repo.commit_write_group()
 
160
        except:
 
161
            repo.abort_write_group()
 
162
            raise
 
163
        # Set the basis state as the trees current state
 
164
        tree._write_inventory(basis)
 
165
        # This reads basis from the repo and puts it into the tree's local
 
166
        # cache, if it has one.
 
167
        tree.set_parent_ids(['basis'])
 
168
        paths = {}
 
169
        parents = set()
 
170
        for old, new, id, entry in delta:
 
171
            if None in (new, entry):
 
172
                continue
 
173
            paths[new] = (entry.file_id, entry.kind)
 
174
            parents.add(osutils.dirname(new))
 
175
        parents = osutils.minimum_path_selection(parents)
 
176
        parents.discard('')
 
177
        # Put place holders in the tree to permit adding the other entries.
 
178
        for pos, parent in enumerate(parents):
 
179
            if not tree.path2id(parent):
 
180
                # add a synthetic directory in the tree so we can can put the
 
181
                # tree0 entries in place for dirstate.
 
182
                tree.add([parent], ["id%d" % pos], ["directory"])
 
183
        if paths:
 
184
            # Many deltas may cause this mini-apply to fail, but we want to see what
 
185
            # the delta application code says, not the prep that we do to deal with 
 
186
            # limitations of dirstate's update_basis code.
 
187
            for path, (file_id, kind) in sorted(paths.items()):
 
188
                try:
 
189
                    tree.add([path], [file_id], [kind])
 
190
                except (KeyboardInterrupt, SystemExit):
 
191
                    raise
 
192
                except:
 
193
                    pass
 
194
    finally:
 
195
        tree.unlock()
 
196
    # Fresh lock, reads disk again.
 
197
    tree.lock_write()
 
198
    try:
 
199
        tree.update_basis_by_delta('result', delta)
 
200
    finally:
 
201
        tree.unlock()
 
202
    # reload tree - ensure we get what was written.
 
203
    tree = tree.bzrdir.open_workingtree()
 
204
    basis_tree = tree.basis_tree()
 
205
    basis_tree.lock_read()
 
206
    self.addCleanup(basis_tree.unlock)
 
207
    # Note, that if the tree does not have a local cache, the trick above of
 
208
    # setting the result as the basis, will come back to bite us. That said,
 
209
    # all the implementations in bzr do have a local cache.
 
210
    return basis_tree.inventory
 
211
 
 
212
 
 
213
def apply_inventory_Repository_add_inventory_by_delta(self, basis, delta):
 
214
    """Apply delta to basis and return the result.
 
215
    
 
216
    This inserts basis as a whole inventory and then uses
 
217
    add_inventory_by_delta to add delta.
 
218
 
 
219
    :param basis: An inventory to be used as the basis.
 
220
    :param delta: The inventory delta to apply:
 
221
    :return: An inventory resulting from the application.
 
222
    """
 
223
    format = self.format()
 
224
    control = self.make_bzrdir('tree', format=format._matchingbzrdir)
 
225
    repo = format.initialize(control)
 
226
    repo.lock_write()
 
227
    try:
 
228
        repo.start_write_group()
 
229
        try:
 
230
            rev = revision.Revision('basis', timestamp=0, timezone=None,
 
231
                message="", committer="foo@example.com")
 
232
            basis.revision_id = 'basis'
 
233
            create_texts_for_inv(repo, basis)
 
234
            repo.add_revision('basis', rev, basis)
 
235
            repo.commit_write_group()
 
236
        except:
 
237
            repo.abort_write_group()
 
238
            raise
 
239
    finally:
 
240
        repo.unlock()
 
241
    repo.lock_write()
 
242
    try:
 
243
        repo.start_write_group()
 
244
        try:
 
245
            inv_sha1 = repo.add_inventory_by_delta('basis', delta,
 
246
                'result', ['basis'])
 
247
        except:
 
248
            repo.abort_write_group()
 
249
            raise
 
250
        else:
 
251
            repo.commit_write_group()
 
252
    finally:
 
253
        repo.unlock()
 
254
    # Fresh lock, reads disk again.
 
255
    repo = repo.bzrdir.open_repository()
 
256
    repo.lock_read()
 
257
    self.addCleanup(repo.unlock)
 
258
    return repo.get_inventory('result')
 
259
 
 
260
 
 
261
class TestInventoryUpdates(TestCase):
 
262
 
 
263
    def test_creation_from_root_id(self):
 
264
        # iff a root id is passed to the constructor, a root directory is made
 
265
        inv = inventory.Inventory(root_id='tree-root')
 
266
        self.assertNotEqual(None, inv.root)
 
267
        self.assertEqual('tree-root', inv.root.file_id)
 
268
 
 
269
    def test_add_path_of_root(self):
 
270
        # if no root id is given at creation time, there is no root directory
 
271
        inv = inventory.Inventory(root_id=None)
 
272
        self.assertIs(None, inv.root)
 
273
        # add a root entry by adding its path
 
274
        ie = inv.add_path("", "directory", "my-root")
 
275
        ie.revision = 'test-rev'
 
276
        self.assertEqual("my-root", ie.file_id)
 
277
        self.assertIs(ie, inv.root)
 
278
 
 
279
    def test_add_path(self):
 
280
        inv = inventory.Inventory(root_id='tree_root')
 
281
        ie = inv.add_path('hello', 'file', 'hello-id')
 
282
        self.assertEqual('hello-id', ie.file_id)
 
283
        self.assertEqual('file', ie.kind)
 
284
 
 
285
    def test_copy(self):
 
286
        """Make sure copy() works and creates a deep copy."""
 
287
        inv = inventory.Inventory(root_id='some-tree-root')
 
288
        ie = inv.add_path('hello', 'file', 'hello-id')
 
289
        inv2 = inv.copy()
 
290
        inv.root.file_id = 'some-new-root'
 
291
        ie.name = 'file2'
 
292
        self.assertEqual('some-tree-root', inv2.root.file_id)
 
293
        self.assertEqual('hello', inv2['hello-id'].name)
 
294
 
 
295
    def test_copy_empty(self):
 
296
        """Make sure an empty inventory can be copied."""
 
297
        inv = inventory.Inventory(root_id=None)
 
298
        inv2 = inv.copy()
 
299
        self.assertIs(None, inv2.root)
 
300
 
 
301
    def test_copy_copies_root_revision(self):
 
302
        """Make sure the revision of the root gets copied."""
 
303
        inv = inventory.Inventory(root_id='someroot')
 
304
        inv.root.revision = 'therev'
 
305
        inv2 = inv.copy()
 
306
        self.assertEquals('someroot', inv2.root.file_id)
 
307
        self.assertEquals('therev', inv2.root.revision)
 
308
 
 
309
    def test_create_tree_reference(self):
 
310
        inv = inventory.Inventory('tree-root-123')
 
311
        inv.add(TreeReference('nested-id', 'nested', parent_id='tree-root-123',
 
312
                              revision='rev', reference_revision='rev2'))
 
313
 
 
314
    def test_error_encoding(self):
 
315
        inv = inventory.Inventory('tree-root')
 
316
        inv.add(InventoryFile('a-id', u'\u1234', 'tree-root'))
 
317
        e = self.assertRaises(errors.InconsistentDelta, inv.add,
 
318
            InventoryFile('b-id', u'\u1234', 'tree-root'))
 
319
        self.assertContainsRe(str(e), r'\\u1234')
 
320
 
 
321
    def test_add_recursive(self):
 
322
        parent = InventoryDirectory('src-id', 'src', 'tree-root')
 
323
        child = InventoryFile('hello-id', 'hello.c', 'src-id')
 
324
        parent.children[child.file_id] = child
 
325
        inv = inventory.Inventory('tree-root')
 
326
        inv.add(parent)
 
327
        self.assertEqual('src/hello.c', inv.id2path('hello-id'))
 
328
 
 
329
 
 
330
 
 
331
class TestDeltaApplication(TestCaseWithTransport):
 
332
 
 
333
    def get_empty_inventory(self, reference_inv=None):
 
334
        """Get an empty inventory.
 
335
 
 
336
        Note that tests should not depend on the revision of the root for
 
337
        setting up test conditions, as it has to be flexible to accomodate non
 
338
        rich root repositories.
 
339
 
 
340
        :param reference_inv: If not None, get the revision for the root from
 
341
            this inventory. This is useful for dealing with older repositories
 
342
            that routinely discarded the root entry data. If None, the root's
 
343
            revision is set to 'basis'.
 
344
        """
 
345
        inv = inventory.Inventory()
 
346
        if reference_inv is not None:
 
347
            inv.root.revision = reference_inv.root.revision
 
348
        else:
 
349
            inv.root.revision = 'basis'
 
350
        return inv
 
351
 
 
352
    def test_empty_delta(self):
 
353
        inv = self.get_empty_inventory()
 
354
        delta = []
 
355
        inv = self.apply_delta(self, inv, delta)
 
356
        inv2 = self.get_empty_inventory(inv)
 
357
        self.assertEqual([], inv2._make_delta(inv))
 
358
 
 
359
    def test_None_file_id(self):
 
360
        inv = self.get_empty_inventory()
 
361
        dir1 = inventory.InventoryDirectory(None, 'dir1', inv.root.file_id)
 
362
        dir1.revision = 'result'
 
363
        delta = [(None, u'dir1', None, dir1)]
 
364
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
365
            inv, delta)
 
366
 
 
367
    def test_unicode_file_id(self):
 
368
        inv = self.get_empty_inventory()
 
369
        dir1 = inventory.InventoryDirectory(u'dirid', 'dir1', inv.root.file_id)
 
370
        dir1.revision = 'result'
 
371
        delta = [(None, u'dir1', dir1.file_id, dir1)]
 
372
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
373
            inv, delta)
 
374
 
 
375
    def test_repeated_file_id(self):
 
376
        inv = self.get_empty_inventory()
 
377
        file1 = inventory.InventoryFile('id', 'path1', inv.root.file_id)
 
378
        file1.revision = 'result'
 
379
        file1.text_size = 0
 
380
        file1.text_sha1 = ""
 
381
        file2 = inventory.InventoryFile('id', 'path2', inv.root.file_id)
 
382
        file2.revision = 'result'
 
383
        file2.text_size = 0
 
384
        file2.text_sha1 = ""
 
385
        delta = [(None, u'path1', 'id', file1), (None, u'path2', 'id', file2)]
 
386
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
387
            inv, delta)
 
388
 
 
389
    def test_repeated_new_path(self):
 
390
        inv = self.get_empty_inventory()
 
391
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
392
        file1.revision = 'result'
 
393
        file1.text_size = 0
 
394
        file1.text_sha1 = ""
 
395
        file2 = inventory.InventoryFile('id2', 'path', inv.root.file_id)
 
396
        file2.revision = 'result'
 
397
        file2.text_size = 0
 
398
        file2.text_sha1 = ""
 
399
        delta = [(None, u'path', 'id1', file1), (None, u'path', 'id2', file2)]
 
400
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
401
            inv, delta)
 
402
 
 
403
    def test_repeated_old_path(self):
 
404
        inv = self.get_empty_inventory()
 
405
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
406
        file1.revision = 'result'
 
407
        file1.text_size = 0
 
408
        file1.text_sha1 = ""
 
409
        # We can't *create* a source inventory with the same path, but
 
410
        # a badly generated partial delta might claim the same source twice.
 
411
        # This would be buggy in two ways: the path is repeated in the delta,
 
412
        # And the path for one of the file ids doesn't match the source
 
413
        # location. Alternatively, we could have a repeated fileid, but that
 
414
        # is separately checked for.
 
415
        file2 = inventory.InventoryFile('id2', 'path2', inv.root.file_id)
 
416
        file2.revision = 'result'
 
417
        file2.text_size = 0
 
418
        file2.text_sha1 = ""
 
419
        inv.add(file1)
 
420
        inv.add(file2)
 
421
        delta = [(u'path', None, 'id1', None), (u'path', None, 'id2', None)]
 
422
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
423
            inv, delta)
 
424
 
 
425
    def test_mismatched_id_entry_id(self):
 
426
        inv = self.get_empty_inventory()
 
427
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
428
        file1.revision = 'result'
 
429
        file1.text_size = 0
 
430
        file1.text_sha1 = ""
 
431
        delta = [(None, u'path', 'id', file1)]
 
432
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
433
            inv, delta)
 
434
 
 
435
    def test_mismatched_new_path_entry_None(self):
 
436
        inv = self.get_empty_inventory()
 
437
        delta = [(None, u'path', 'id', None)]
 
438
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
439
            inv, delta)
 
440
 
 
441
    def test_mismatched_new_path_None_entry(self):
 
442
        inv = self.get_empty_inventory()
 
443
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
444
        file1.revision = 'result'
 
445
        file1.text_size = 0
 
446
        file1.text_sha1 = ""
 
447
        delta = [(u"path", None, 'id1', file1)]
 
448
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
449
            inv, delta)
 
450
 
 
451
    def test_parent_is_not_directory(self):
 
452
        inv = self.get_empty_inventory()
 
453
        file1 = inventory.InventoryFile('id1', 'path', inv.root.file_id)
 
454
        file1.revision = 'result'
 
455
        file1.text_size = 0
 
456
        file1.text_sha1 = ""
 
457
        file2 = inventory.InventoryFile('id2', 'path2', 'id1')
 
458
        file2.revision = 'result'
 
459
        file2.text_size = 0
 
460
        file2.text_sha1 = ""
 
461
        inv.add(file1)
 
462
        delta = [(None, u'path/path2', 'id2', file2)]
 
463
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
464
            inv, delta)
 
465
 
 
466
    def test_parent_is_missing(self):
 
467
        inv = self.get_empty_inventory()
 
468
        file2 = inventory.InventoryFile('id2', 'path2', 'missingparent')
 
469
        file2.revision = 'result'
 
470
        file2.text_size = 0
 
471
        file2.text_sha1 = ""
 
472
        delta = [(None, u'path/path2', 'id2', file2)]
 
473
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
474
            inv, delta)
 
475
 
 
476
    def test_new_parent_path_has_wrong_id(self):
 
477
        inv = self.get_empty_inventory()
 
478
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
479
        parent1.revision = 'result'
 
480
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
481
        parent2.revision = 'result'
 
482
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
483
        file1.revision = 'result'
 
484
        file1.text_size = 0
 
485
        file1.text_sha1 = ""
 
486
        inv.add(parent1)
 
487
        inv.add(parent2)
 
488
        # This delta claims that file1 is at dir/path, but actually its at
 
489
        # dir2/path if you follow the inventory parent structure.
 
490
        delta = [(None, u'dir/path', 'id', file1)]
 
491
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
492
            inv, delta)
 
493
 
 
494
    def test_old_parent_path_is_wrong(self):
 
495
        inv = self.get_empty_inventory()
 
496
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
497
        parent1.revision = 'result'
 
498
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
499
        parent2.revision = 'result'
 
500
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
501
        file1.revision = 'result'
 
502
        file1.text_size = 0
 
503
        file1.text_sha1 = ""
 
504
        inv.add(parent1)
 
505
        inv.add(parent2)
 
506
        inv.add(file1)
 
507
        # This delta claims that file1 was at dir/path, but actually it was at
 
508
        # dir2/path if you follow the inventory parent structure.
 
509
        delta = [(u'dir/path', None, 'id', None)]
 
510
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
511
            inv, delta)
 
512
 
 
513
    def test_old_parent_path_is_for_other_id(self):
 
514
        inv = self.get_empty_inventory()
 
515
        parent1 = inventory.InventoryDirectory('p-1', 'dir', inv.root.file_id)
 
516
        parent1.revision = 'result'
 
517
        parent2 = inventory.InventoryDirectory('p-2', 'dir2', inv.root.file_id)
 
518
        parent2.revision = 'result'
 
519
        file1 = inventory.InventoryFile('id', 'path', 'p-2')
 
520
        file1.revision = 'result'
 
521
        file1.text_size = 0
 
522
        file1.text_sha1 = ""
 
523
        file2 = inventory.InventoryFile('id2', 'path', 'p-1')
 
524
        file2.revision = 'result'
 
525
        file2.text_size = 0
 
526
        file2.text_sha1 = ""
 
527
        inv.add(parent1)
 
528
        inv.add(parent2)
 
529
        inv.add(file1)
 
530
        inv.add(file2)
 
531
        # This delta claims that file1 was at dir/path, but actually it was at
 
532
        # dir2/path if you follow the inventory parent structure. At dir/path
 
533
        # is another entry we should not delete.
 
534
        delta = [(u'dir/path', None, 'id', None)]
 
535
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
536
            inv, delta)
 
537
 
 
538
    def test_add_existing_id_new_path(self):
 
539
        inv = self.get_empty_inventory()
 
540
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
541
        parent1.revision = 'result'
 
542
        parent2 = inventory.InventoryDirectory('p-1', 'dir2', inv.root.file_id)
 
543
        parent2.revision = 'result'
 
544
        inv.add(parent1)
 
545
        delta = [(None, u'dir2', 'p-1', parent2)]
 
546
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
547
            inv, delta)
 
548
 
 
549
    def test_add_new_id_existing_path(self):
 
550
        inv = self.get_empty_inventory()
 
551
        parent1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
552
        parent1.revision = 'result'
 
553
        parent2 = inventory.InventoryDirectory('p-2', 'dir1', inv.root.file_id)
 
554
        parent2.revision = 'result'
 
555
        inv.add(parent1)
 
556
        delta = [(None, u'dir1', 'p-2', parent2)]
 
557
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
558
            inv, delta)
 
559
 
 
560
    def test_remove_dir_leaving_dangling_child(self):
 
561
        inv = self.get_empty_inventory()
 
562
        dir1 = inventory.InventoryDirectory('p-1', 'dir1', inv.root.file_id)
 
563
        dir1.revision = 'result'
 
564
        dir2 = inventory.InventoryDirectory('p-2', 'child1', 'p-1')
 
565
        dir2.revision = 'result'
 
566
        dir3 = inventory.InventoryDirectory('p-3', 'child2', 'p-1')
 
567
        dir3.revision = 'result'
 
568
        inv.add(dir1)
 
569
        inv.add(dir2)
 
570
        inv.add(dir3)
 
571
        delta = [(u'dir1', None, 'p-1', None),
 
572
            (u'dir1/child2', None, 'p-3', None)]
 
573
        self.assertRaises(errors.InconsistentDelta, self.apply_delta, self,
 
574
            inv, delta)
28
575
 
29
576
 
30
577
class TestInventory(TestCase):
31
578
 
32
 
    def test_is_within(self):
33
 
        from bzrlib.osutils import is_inside_any
34
 
 
35
 
        SRC_FOO_C = os.path.join('src', 'foo.c')
36
 
        for dirs, fn in [(['src', 'doc'], SRC_FOO_C),
37
 
                         (['src'], SRC_FOO_C),
38
 
                         (['src'], 'src'),
39
 
                         ]:
40
 
            self.assert_(is_inside_any(dirs, fn))
41
 
            
42
 
        for dirs, fn in [(['src'], 'srccontrol'),
43
 
                         (['src'], 'srccontrol/foo')]:
44
 
            self.assertFalse(is_inside_any(dirs, fn))
45
 
            
46
 
    def test_ids(self):
47
 
        """Test detection of files within selected directories."""
48
 
        inv = Inventory()
49
 
        
50
 
        for args in [('src', 'directory', 'src-id'), 
51
 
                     ('doc', 'directory', 'doc-id'), 
52
 
                     ('src/hello.c', 'file'),
53
 
                     ('src/bye.c', 'file', 'bye-id'),
54
 
                     ('Makefile', 'file')]:
55
 
            inv.add_path(*args)
56
 
            
57
 
        self.assertEqual(inv.path2id('src'), 'src-id')
58
 
        self.assertEqual(inv.path2id('src/bye.c'), 'bye-id')
59
 
        
60
 
        self.assert_('src-id' in inv)
61
 
 
62
 
 
63
 
    def test_version(self):
64
 
        """Inventory remembers the text's version."""
65
 
        inv = Inventory()
66
 
        ie = inv.add_path('foo.txt', 'file')
67
 
        ## XXX
 
579
    def test_is_root(self):
 
580
        """Ensure our root-checking code is accurate."""
 
581
        inv = inventory.Inventory('TREE_ROOT')
 
582
        self.assertTrue(inv.is_root('TREE_ROOT'))
 
583
        self.assertFalse(inv.is_root('booga'))
 
584
        inv.root.file_id = 'booga'
 
585
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
586
        self.assertTrue(inv.is_root('booga'))
 
587
        # works properly even if no root is set
 
588
        inv.root = None
 
589
        self.assertFalse(inv.is_root('TREE_ROOT'))
 
590
        self.assertFalse(inv.is_root('booga'))
 
591
 
 
592
    def test_entries_for_empty_inventory(self):
 
593
        """Test that entries() will not fail for an empty inventory"""
 
594
        inv = Inventory(root_id=None)
 
595
        self.assertEqual([], inv.entries())
68
596
 
69
597
 
70
598
class TestInventoryEntry(TestCase):
83
611
 
84
612
    def test_dir_detect_changes(self):
85
613
        left = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
86
 
        left.text_sha1 = 123
87
 
        left.executable = True
88
 
        left.symlink_target='foo'
89
614
        right = inventory.InventoryDirectory('123', 'hello.c', ROOT_ID)
90
 
        right.text_sha1 = 321
91
 
        right.symlink_target='bar'
92
615
        self.assertEqual((False, False), left.detect_changes(right))
93
616
        self.assertEqual((False, False), right.detect_changes(left))
94
617
 
108
631
 
109
632
    def test_symlink_detect_changes(self):
110
633
        left = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
111
 
        left.text_sha1 = 123
112
 
        left.executable = True
113
634
        left.symlink_target='foo'
114
635
        right = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
115
 
        right.text_sha1 = 321
116
636
        right.symlink_target='foo'
117
637
        self.assertEqual((False, False), left.detect_changes(right))
118
638
        self.assertEqual((False, False), right.detect_changes(left))
132
652
        link = inventory.InventoryLink('123', 'hello.c', ROOT_ID)
133
653
        self.failIf(link.has_text())
134
654
 
135
 
 
136
 
class TestEntryDiffing(TestCaseInTempDir):
137
 
 
138
 
    def setUp(self):
139
 
        super(TestEntryDiffing, self).setUp()
140
 
        self.branch = Branch.initialize(u'.')
141
 
        self.wt = self.branch.working_tree()
142
 
        print >> open('file', 'wb'), 'foo'
143
 
        self.branch.working_tree().add(['file'], ['fileid'])
144
 
        if has_symlinks():
145
 
            os.symlink('target1', 'symlink')
146
 
            self.branch.working_tree().add(['symlink'], ['linkid'])
147
 
        self.wt.commit('message_1', rev_id = '1')
148
 
        print >> open('file', 'wb'), 'bar'
149
 
        if has_symlinks():
150
 
            os.unlink('symlink')
151
 
            os.symlink('target2', 'symlink')
152
 
        self.tree_1 = self.branch.revision_tree('1')
153
 
        self.inv_1 = self.branch.get_inventory('1')
154
 
        self.file_1 = self.inv_1['fileid']
155
 
        self.tree_2 = self.branch.working_tree()
156
 
        self.inv_2 = self.tree_2.read_working_inventory()
157
 
        self.file_2 = self.inv_2['fileid']
158
 
        if has_symlinks():
159
 
            self.link_1 = self.inv_1['linkid']
160
 
            self.link_2 = self.inv_2['linkid']
161
 
 
162
 
    def test_file_diff_deleted(self):
163
 
        output = StringIO()
164
 
        self.file_1.diff(internal_diff, 
165
 
                          "old_label", self.tree_1,
166
 
                          "/dev/null", None, None,
167
 
                          output)
168
 
        self.assertEqual(output.getvalue(), "--- old_label\t\n"
169
 
                                            "+++ /dev/null\t\n"
170
 
                                            "@@ -1,1 +0,0 @@\n"
171
 
                                            "-foo\n"
172
 
                                            "\n")
173
 
 
174
 
    def test_file_diff_added(self):
175
 
        output = StringIO()
176
 
        self.file_1.diff(internal_diff, 
177
 
                          "new_label", self.tree_1,
178
 
                          "/dev/null", None, None,
179
 
                          output, reverse=True)
180
 
        self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
181
 
                                            "+++ new_label\t\n"
182
 
                                            "@@ -0,0 +1,1 @@\n"
183
 
                                            "+foo\n"
184
 
                                            "\n")
185
 
 
186
 
    def test_file_diff_changed(self):
187
 
        output = StringIO()
188
 
        self.file_1.diff(internal_diff, 
189
 
                          "/dev/null", self.tree_1, 
190
 
                          "new_label", self.file_2, self.tree_2,
191
 
                          output)
192
 
        self.assertEqual(output.getvalue(), "--- /dev/null\t\n"
193
 
                                            "+++ new_label\t\n"
194
 
                                            "@@ -1,1 +1,1 @@\n"
195
 
                                            "-foo\n"
196
 
                                            "+bar\n"
197
 
                                            "\n")
198
 
        
199
 
    def test_link_diff_deleted(self):
200
 
        if not has_symlinks():
201
 
            return
202
 
        output = StringIO()
203
 
        self.link_1.diff(internal_diff, 
204
 
                          "old_label", self.tree_1,
205
 
                          "/dev/null", None, None,
206
 
                          output)
207
 
        self.assertEqual(output.getvalue(),
208
 
                         "=== target was 'target1'\n")
209
 
 
210
 
    def test_link_diff_added(self):
211
 
        if not has_symlinks():
212
 
            return
213
 
        output = StringIO()
214
 
        self.link_1.diff(internal_diff, 
215
 
                          "new_label", self.tree_1,
216
 
                          "/dev/null", None, None,
217
 
                          output, reverse=True)
218
 
        self.assertEqual(output.getvalue(),
219
 
                         "=== target is 'target1'\n")
220
 
 
221
 
    def test_link_diff_changed(self):
222
 
        if not has_symlinks():
223
 
            return
224
 
        output = StringIO()
225
 
        self.link_1.diff(internal_diff, 
226
 
                          "/dev/null", self.tree_1, 
227
 
                          "new_label", self.link_2, self.tree_2,
228
 
                          output)
229
 
        self.assertEqual(output.getvalue(),
230
 
                         "=== target changed 'target1' => 'target2'\n")
231
 
 
232
 
 
233
 
class TestSnapshot(TestCaseInTempDir):
234
 
 
235
 
    def setUp(self):
236
 
        # for full testing we'll need a branch
237
 
        # with a subdir to test parent changes.
238
 
        # and a file, link and dir under that.
239
 
        # but right now I only need one attribute
240
 
        # to change, and then test merge patterns
241
 
        # with fake parent entries.
242
 
        super(TestSnapshot, self).setUp()
243
 
        self.branch = Branch.initialize(u'.')
244
 
        self.build_tree(['subdir/', 'subdir/file'], line_endings='binary')
245
 
        self.branch.working_tree().add(['subdir', 'subdir/file'],
246
 
                                       ['dirid', 'fileid'])
247
 
        if has_symlinks():
248
 
            pass
249
 
        self.wt = self.branch.working_tree()
250
 
        self.wt.commit('message_1', rev_id = '1')
251
 
        self.tree_1 = self.branch.revision_tree('1')
252
 
        self.inv_1 = self.branch.get_inventory('1')
253
 
        self.file_1 = self.inv_1['fileid']
254
 
        self.work_tree = self.branch.working_tree()
255
 
        self.file_active = self.work_tree.inventory['fileid']
256
 
 
257
 
    def test_snapshot_new_revision(self):
258
 
        # This tests that a simple commit with no parents makes a new
259
 
        # revision value in the inventory entry
260
 
        self.file_active.snapshot('2', 'subdir/file', {}, self.work_tree, 
261
 
                                  self.branch.weave_store,
262
 
                                  self.branch.get_transaction())
263
 
        # expected outcome - file_1 has a revision id of '2', and we can get
264
 
        # its text of 'file contents' out of the weave.
265
 
        self.assertEqual(self.file_1.revision, '1')
266
 
        self.assertEqual(self.file_active.revision, '2')
267
 
        # this should be a separate test probably, but lets check it once..
268
 
        lines = self.branch.weave_store.get_lines('fileid','2',
269
 
            self.branch.get_transaction())
270
 
        self.assertEqual(lines, ['contents of subdir/file\n'])
271
 
 
272
 
    def test_snapshot_unchanged(self):
273
 
        #This tests that a simple commit does not make a new entry for
274
 
        # an unchanged inventory entry
275
 
        self.file_active.snapshot('2', 'subdir/file', {'1':self.file_1},
276
 
                                  self.work_tree, self.branch.weave_store,
277
 
                                  self.branch.get_transaction())
278
 
        self.assertEqual(self.file_1.revision, '1')
279
 
        self.assertEqual(self.file_active.revision, '1')
280
 
        self.assertRaises(errors.WeaveError,
281
 
                          self.branch.weave_store.get_lines, 'fileid', '2',
282
 
                          self.branch.get_transaction())
283
 
 
284
 
    def test_snapshot_merge_identical_different_revid(self):
285
 
        # This tests that a commit with two identical parents, one of which has
286
 
        # a different revision id, results in a new revision id in the entry.
287
 
        # 1->other, commit a merge of other against 1, results in 2.
288
 
        other_ie = inventory.InventoryFile('fileid', 'newname', self.file_1.parent_id)
289
 
        other_ie = inventory.InventoryFile('fileid', 'file', self.file_1.parent_id)
290
 
        other_ie.revision = '1'
291
 
        other_ie.text_sha1 = self.file_1.text_sha1
292
 
        other_ie.text_size = self.file_1.text_size
293
 
        self.assertEqual(self.file_1, other_ie)
294
 
        other_ie.revision = 'other'
295
 
        self.assertNotEqual(self.file_1, other_ie)
296
 
        self.branch.weave_store.add_identical_text('fileid', '1', 'other', ['1'],
297
 
            self.branch.get_transaction())
298
 
        self.file_active.snapshot('2', 'subdir/file', 
299
 
                                  {'1':self.file_1, 'other':other_ie},
300
 
                                  self.work_tree, self.branch.weave_store,
301
 
                                  self.branch.get_transaction())
302
 
        self.assertEqual(self.file_active.revision, '2')
303
 
 
304
 
    def test_snapshot_changed(self):
305
 
        # This tests that a commit with one different parent results in a new
306
 
        # revision id in the entry.
307
 
        self.file_active.name='newname'
308
 
        rename('subdir/file', 'subdir/newname')
309
 
        self.file_active.snapshot('2', 'subdir/newname', {'1':self.file_1}, 
310
 
                                  self.work_tree, 
311
 
                                  self.branch.weave_store,
312
 
                                  self.branch.get_transaction())
313
 
        # expected outcome - file_1 has a revision id of '2'
314
 
        self.assertEqual(self.file_active.revision, '2')
315
 
 
316
 
 
317
 
class TestPreviousHeads(TestCaseInTempDir):
318
 
 
319
 
    def setUp(self):
320
 
        # we want several inventories, that respectively
321
 
        # give use the following scenarios:
322
 
        # A) fileid not in any inventory (A),
323
 
        # B) fileid present in one inventory (B) and (A,B)
324
 
        # C) fileid present in two inventories, and they
325
 
        #   are not mutual descendents (B, C)
326
 
        # D) fileid present in two inventories and one is
327
 
        #   a descendent of the other. (B, D)
328
 
        super(TestPreviousHeads, self).setUp()
329
 
        self.build_tree(['file'])
330
 
        self.branch = Branch.initialize(u'.')
331
 
        self.wt = self.branch.working_tree()
332
 
        self.wt.commit('new branch', allow_pointless=True, rev_id='A')
333
 
        self.inv_A = self.branch.get_inventory('A')
334
 
        self.branch.working_tree().add(['file'], ['fileid'])
335
 
        self.wt.commit('add file', rev_id='B')
336
 
        self.inv_B = self.branch.get_inventory('B')
337
 
        self.branch.put_controlfile('revision-history', 'A\n')
338
 
        self.assertEqual(self.branch.revision_history(), ['A'])
339
 
        self.wt.commit('another add of file', rev_id='C')
340
 
        self.inv_C = self.branch.get_inventory('C')
341
 
        self.wt.add_pending_merge('B')
342
 
        self.wt.commit('merge in B', rev_id='D')
343
 
        self.inv_D = self.branch.get_inventory('D')
344
 
        self.file_active = self.wt.inventory['fileid']
345
 
        self.weave = self.branch.weave_store.get_weave('fileid',
346
 
            self.branch.get_transaction())
347
 
        
348
 
    def get_previous_heads(self, inventories):
349
 
        return self.file_active.find_previous_heads(inventories, self.weave)
350
 
        
351
 
    def test_fileid_in_no_inventory(self):
352
 
        self.assertEqual({}, self.get_previous_heads([self.inv_A]))
353
 
 
354
 
    def test_fileid_in_one_inventory(self):
355
 
        self.assertEqual({'B':self.inv_B['fileid']},
356
 
                         self.get_previous_heads([self.inv_B]))
357
 
        self.assertEqual({'B':self.inv_B['fileid']},
358
 
                         self.get_previous_heads([self.inv_A, self.inv_B]))
359
 
        self.assertEqual({'B':self.inv_B['fileid']},
360
 
                         self.get_previous_heads([self.inv_B, self.inv_A]))
361
 
 
362
 
    def test_fileid_in_two_inventories_gives_both_entries(self):
363
 
        self.assertEqual({'B':self.inv_B['fileid'],
364
 
                          'C':self.inv_C['fileid']},
365
 
                          self.get_previous_heads([self.inv_B, self.inv_C]))
366
 
        self.assertEqual({'B':self.inv_B['fileid'],
367
 
                          'C':self.inv_C['fileid']},
368
 
                          self.get_previous_heads([self.inv_C, self.inv_B]))
369
 
 
370
 
    def test_fileid_in_two_inventories_already_merged_gives_head(self):
371
 
        self.assertEqual({'D':self.inv_D['fileid']},
372
 
                         self.get_previous_heads([self.inv_B, self.inv_D]))
373
 
        self.assertEqual({'D':self.inv_D['fileid']},
374
 
                         self.get_previous_heads([self.inv_D, self.inv_B]))
375
 
 
376
 
    # TODO: test two inventories with the same file revision 
 
655
    def test_make_entry(self):
 
656
        self.assertIsInstance(inventory.make_entry("file", "name", ROOT_ID),
 
657
            inventory.InventoryFile)
 
658
        self.assertIsInstance(inventory.make_entry("symlink", "name", ROOT_ID),
 
659
            inventory.InventoryLink)
 
660
        self.assertIsInstance(inventory.make_entry("directory", "name", ROOT_ID),
 
661
            inventory.InventoryDirectory)
 
662
 
 
663
    def test_make_entry_non_normalized(self):
 
664
        orig_normalized_filename = osutils.normalized_filename
 
665
 
 
666
        try:
 
667
            osutils.normalized_filename = osutils._accessible_normalized_filename
 
668
            entry = inventory.make_entry("file", u'a\u030a', ROOT_ID)
 
669
            self.assertEqual(u'\xe5', entry.name)
 
670
            self.assertIsInstance(entry, inventory.InventoryFile)
 
671
 
 
672
            osutils.normalized_filename = osutils._inaccessible_normalized_filename
 
673
            self.assertRaises(errors.InvalidNormalization,
 
674
                    inventory.make_entry, 'file', u'a\u030a', ROOT_ID)
 
675
        finally:
 
676
            osutils.normalized_filename = orig_normalized_filename
 
677
 
 
678
 
 
679
class TestDescribeChanges(TestCase):
 
680
 
 
681
    def test_describe_change(self):
 
682
        # we need to test the following change combinations:
 
683
        # rename
 
684
        # reparent
 
685
        # modify
 
686
        # gone
 
687
        # added
 
688
        # renamed/reparented and modified
 
689
        # change kind (perhaps can't be done yet?)
 
690
        # also, merged in combination with all of these?
 
691
        old_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
692
        old_a.text_sha1 = '123132'
 
693
        old_a.text_size = 0
 
694
        new_a = InventoryFile('a-id', 'a_file', ROOT_ID)
 
695
        new_a.text_sha1 = '123132'
 
696
        new_a.text_size = 0
 
697
 
 
698
        self.assertChangeDescription('unchanged', old_a, new_a)
 
699
 
 
700
        new_a.text_size = 10
 
701
        new_a.text_sha1 = 'abcabc'
 
702
        self.assertChangeDescription('modified', old_a, new_a)
 
703
 
 
704
        self.assertChangeDescription('added', None, new_a)
 
705
        self.assertChangeDescription('removed', old_a, None)
 
706
        # perhaps a bit questionable but seems like the most reasonable thing...
 
707
        self.assertChangeDescription('unchanged', None, None)
 
708
 
 
709
        # in this case it's both renamed and modified; show a rename and
 
710
        # modification:
 
711
        new_a.name = 'newfilename'
 
712
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
713
 
 
714
        # reparenting is 'renaming'
 
715
        new_a.name = old_a.name
 
716
        new_a.parent_id = 'somedir-id'
 
717
        self.assertChangeDescription('modified and renamed', old_a, new_a)
 
718
 
 
719
        # reset the content values so its not modified
 
720
        new_a.text_size = old_a.text_size
 
721
        new_a.text_sha1 = old_a.text_sha1
 
722
        new_a.name = old_a.name
 
723
 
 
724
        new_a.name = 'newfilename'
 
725
        self.assertChangeDescription('renamed', old_a, new_a)
 
726
 
 
727
        # reparenting is 'renaming'
 
728
        new_a.name = old_a.name
 
729
        new_a.parent_id = 'somedir-id'
 
730
        self.assertChangeDescription('renamed', old_a, new_a)
 
731
 
 
732
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
 
733
        change = InventoryEntry.describe_change(old_ie, new_ie)
 
734
        self.assertEqual(expected_change, change)
 
735
 
 
736
 
 
737
class TestCHKInventory(tests.TestCaseWithMemoryTransport):
 
738
 
 
739
    def get_chk_bytes(self):
 
740
        factory = groupcompress.make_pack_factory(True, True, 1)
 
741
        trans = self.get_transport('')
 
742
        return factory(trans)
 
743
 
 
744
    def read_bytes(self, chk_bytes, key):
 
745
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
746
        return stream.next().get_bytes_as("fulltext")
 
747
 
 
748
    def test_deserialise_gives_CHKInventory(self):
 
749
        inv = Inventory()
 
750
        inv.revision_id = "revid"
 
751
        inv.root.revision = "rootrev"
 
752
        chk_bytes = self.get_chk_bytes()
 
753
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
754
        bytes = ''.join(chk_inv.to_lines())
 
755
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
756
        self.assertEqual("revid", new_inv.revision_id)
 
757
        self.assertEqual("directory", new_inv.root.kind)
 
758
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
759
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
760
        self.assertEqual(inv.root.name, new_inv.root.name)
 
761
        self.assertEqual("rootrev", new_inv.root.revision)
 
762
        self.assertEqual('plain', new_inv._search_key_name)
 
763
 
 
764
    def test_deserialise_wrong_revid(self):
 
765
        inv = Inventory()
 
766
        inv.revision_id = "revid"
 
767
        inv.root.revision = "rootrev"
 
768
        chk_bytes = self.get_chk_bytes()
 
769
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
770
        bytes = ''.join(chk_inv.to_lines())
 
771
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
772
            bytes, ("revid2",))
 
773
 
 
774
    def test_captures_rev_root_byid(self):
 
775
        inv = Inventory()
 
776
        inv.revision_id = "foo"
 
777
        inv.root.revision = "bar"
 
778
        chk_bytes = self.get_chk_bytes()
 
779
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
780
        lines = chk_inv.to_lines()
 
781
        self.assertEqual([
 
782
            'chkinventory:\n',
 
783
            'revision_id: foo\n',
 
784
            'root_id: TREE_ROOT\n',
 
785
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
786
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
787
            ], lines)
 
788
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
789
        self.assertEqual('plain', chk_inv._search_key_name)
 
790
 
 
791
    def test_captures_parent_id_basename_index(self):
 
792
        inv = Inventory()
 
793
        inv.revision_id = "foo"
 
794
        inv.root.revision = "bar"
 
795
        chk_bytes = self.get_chk_bytes()
 
796
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
797
        lines = chk_inv.to_lines()
 
798
        self.assertEqual([
 
799
            'chkinventory:\n',
 
800
            'revision_id: foo\n',
 
801
            'root_id: TREE_ROOT\n',
 
802
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
803
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
804
            ], lines)
 
805
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
806
        self.assertEqual('plain', chk_inv._search_key_name)
 
807
 
 
808
    def test_captures_search_key_name(self):
 
809
        inv = Inventory()
 
810
        inv.revision_id = "foo"
 
811
        inv.root.revision = "bar"
 
812
        chk_bytes = self.get_chk_bytes()
 
813
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
814
                                              search_key_name='hash-16-way')
 
815
        lines = chk_inv.to_lines()
 
816
        self.assertEqual([
 
817
            'chkinventory:\n',
 
818
            'search_key_name: hash-16-way\n',
 
819
            'root_id: TREE_ROOT\n',
 
820
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
821
            'revision_id: foo\n',
 
822
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
823
            ], lines)
 
824
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
825
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
 
826
 
 
827
    def test_directory_children_on_demand(self):
 
828
        inv = Inventory()
 
829
        inv.revision_id = "revid"
 
830
        inv.root.revision = "rootrev"
 
831
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
832
        inv["fileid"].revision = "filerev"
 
833
        inv["fileid"].executable = True
 
834
        inv["fileid"].text_sha1 = "ffff"
 
835
        inv["fileid"].text_size = 1
 
836
        chk_bytes = self.get_chk_bytes()
 
837
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
838
        bytes = ''.join(chk_inv.to_lines())
 
839
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
840
        root_entry = new_inv[inv.root.file_id]
 
841
        self.assertEqual(None, root_entry._children)
 
842
        self.assertEqual(['file'], root_entry.children.keys())
 
843
        file_direct = new_inv["fileid"]
 
844
        file_found = root_entry.children['file']
 
845
        self.assertEqual(file_direct.kind, file_found.kind)
 
846
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
847
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
848
        self.assertEqual(file_direct.name, file_found.name)
 
849
        self.assertEqual(file_direct.revision, file_found.revision)
 
850
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
851
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
852
        self.assertEqual(file_direct.executable, file_found.executable)
 
853
 
 
854
    def test_from_inventory_maximum_size(self):
 
855
        # from_inventory supports the maximum_size parameter.
 
856
        inv = Inventory()
 
857
        inv.revision_id = "revid"
 
858
        inv.root.revision = "rootrev"
 
859
        chk_bytes = self.get_chk_bytes()
 
860
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
861
        chk_inv.id_to_entry._ensure_root()
 
862
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
863
        self.assertEqual(1, chk_inv.id_to_entry._root_node._key_width)
 
864
        p_id_basename = chk_inv.parent_id_basename_to_file_id
 
865
        p_id_basename._ensure_root()
 
866
        self.assertEqual(120, p_id_basename._root_node.maximum_size)
 
867
        self.assertEqual(2, p_id_basename._root_node._key_width)
 
868
 
 
869
    def test___iter__(self):
 
870
        inv = Inventory()
 
871
        inv.revision_id = "revid"
 
872
        inv.root.revision = "rootrev"
 
873
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
874
        inv["fileid"].revision = "filerev"
 
875
        inv["fileid"].executable = True
 
876
        inv["fileid"].text_sha1 = "ffff"
 
877
        inv["fileid"].text_size = 1
 
878
        chk_bytes = self.get_chk_bytes()
 
879
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
880
        bytes = ''.join(chk_inv.to_lines())
 
881
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
882
        fileids = list(new_inv.__iter__())
 
883
        fileids.sort()
 
884
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
 
885
 
 
886
    def test__len__(self):
 
887
        inv = Inventory()
 
888
        inv.revision_id = "revid"
 
889
        inv.root.revision = "rootrev"
 
890
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
891
        inv["fileid"].revision = "filerev"
 
892
        inv["fileid"].executable = True
 
893
        inv["fileid"].text_sha1 = "ffff"
 
894
        inv["fileid"].text_size = 1
 
895
        chk_bytes = self.get_chk_bytes()
 
896
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
897
        self.assertEqual(2, len(chk_inv))
 
898
 
 
899
    def test___getitem__(self):
 
900
        inv = Inventory()
 
901
        inv.revision_id = "revid"
 
902
        inv.root.revision = "rootrev"
 
903
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
904
        inv["fileid"].revision = "filerev"
 
905
        inv["fileid"].executable = True
 
906
        inv["fileid"].text_sha1 = "ffff"
 
907
        inv["fileid"].text_size = 1
 
908
        chk_bytes = self.get_chk_bytes()
 
909
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
910
        bytes = ''.join(chk_inv.to_lines())
 
911
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
912
        root_entry = new_inv[inv.root.file_id]
 
913
        file_entry = new_inv["fileid"]
 
914
        self.assertEqual("directory", root_entry.kind)
 
915
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
916
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
917
        self.assertEqual(inv.root.name, root_entry.name)
 
918
        self.assertEqual("rootrev", root_entry.revision)
 
919
        self.assertEqual("file", file_entry.kind)
 
920
        self.assertEqual("fileid", file_entry.file_id)
 
921
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
922
        self.assertEqual("file", file_entry.name)
 
923
        self.assertEqual("filerev", file_entry.revision)
 
924
        self.assertEqual("ffff", file_entry.text_sha1)
 
925
        self.assertEqual(1, file_entry.text_size)
 
926
        self.assertEqual(True, file_entry.executable)
 
927
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
 
928
 
 
929
    def test_has_id_true(self):
 
930
        inv = Inventory()
 
931
        inv.revision_id = "revid"
 
932
        inv.root.revision = "rootrev"
 
933
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
934
        inv["fileid"].revision = "filerev"
 
935
        inv["fileid"].executable = True
 
936
        inv["fileid"].text_sha1 = "ffff"
 
937
        inv["fileid"].text_size = 1
 
938
        chk_bytes = self.get_chk_bytes()
 
939
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
940
        self.assertTrue(chk_inv.has_id('fileid'))
 
941
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
942
 
 
943
    def test_has_id_not(self):
 
944
        inv = Inventory()
 
945
        inv.revision_id = "revid"
 
946
        inv.root.revision = "rootrev"
 
947
        chk_bytes = self.get_chk_bytes()
 
948
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
949
        self.assertFalse(chk_inv.has_id('fileid'))
 
950
 
 
951
    def test_id2path(self):
 
952
        inv = Inventory()
 
953
        inv.revision_id = "revid"
 
954
        inv.root.revision = "rootrev"
 
955
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
956
        fileentry = InventoryFile("fileid", "file", "dirid")
 
957
        inv.add(direntry)
 
958
        inv.add(fileentry)
 
959
        inv["fileid"].revision = "filerev"
 
960
        inv["fileid"].executable = True
 
961
        inv["fileid"].text_sha1 = "ffff"
 
962
        inv["fileid"].text_size = 1
 
963
        inv["dirid"].revision = "filerev"
 
964
        chk_bytes = self.get_chk_bytes()
 
965
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
966
        bytes = ''.join(chk_inv.to_lines())
 
967
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
968
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
969
        self.assertEqual('dir', new_inv.id2path('dirid'))
 
970
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
 
971
 
 
972
    def test_path2id(self):
 
973
        inv = Inventory()
 
974
        inv.revision_id = "revid"
 
975
        inv.root.revision = "rootrev"
 
976
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
977
        fileentry = InventoryFile("fileid", "file", "dirid")
 
978
        inv.add(direntry)
 
979
        inv.add(fileentry)
 
980
        inv["fileid"].revision = "filerev"
 
981
        inv["fileid"].executable = True
 
982
        inv["fileid"].text_sha1 = "ffff"
 
983
        inv["fileid"].text_size = 1
 
984
        inv["dirid"].revision = "filerev"
 
985
        chk_bytes = self.get_chk_bytes()
 
986
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
987
        bytes = ''.join(chk_inv.to_lines())
 
988
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
989
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
990
        self.assertEqual('dirid', new_inv.path2id('dir'))
 
991
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
 
992
 
 
993
    def test_create_by_apply_delta_sets_root(self):
 
994
        inv = Inventory()
 
995
        inv.revision_id = "revid"
 
996
        chk_bytes = self.get_chk_bytes()
 
997
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
998
        inv.add_path("", "directory", "myrootid", None)
 
999
        inv.revision_id = "expectedid"
 
1000
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1001
        delta = [("", None, base_inv.root.file_id, None),
 
1002
            (None, "",  "myrootid", inv.root)]
 
1003
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1004
        self.assertEquals(reference_inv.root, new_inv.root)
 
1005
 
 
1006
    def test_create_by_apply_delta_empty_add_child(self):
 
1007
        inv = Inventory()
 
1008
        inv.revision_id = "revid"
 
1009
        inv.root.revision = "rootrev"
 
1010
        chk_bytes = self.get_chk_bytes()
 
1011
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1012
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1013
        a_entry.revision = "filerev"
 
1014
        a_entry.executable = True
 
1015
        a_entry.text_sha1 = "ffff"
 
1016
        a_entry.text_size = 1
 
1017
        inv.add(a_entry)
 
1018
        inv.revision_id = "expectedid"
 
1019
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1020
        delta = [(None, "A",  "A-id", a_entry)]
 
1021
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1022
        # new_inv should be the same as reference_inv.
 
1023
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1024
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1025
        reference_inv.id_to_entry._ensure_root()
 
1026
        new_inv.id_to_entry._ensure_root()
 
1027
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1028
            new_inv.id_to_entry._root_node._key)
 
1029
 
 
1030
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
1031
        inv = Inventory()
 
1032
        inv.revision_id = "revid"
 
1033
        inv.root.revision = "rootrev"
 
1034
        chk_bytes = self.get_chk_bytes()
 
1035
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1036
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
1037
        a_entry.revision = "filerev"
 
1038
        a_entry.executable = True
 
1039
        a_entry.text_sha1 = "ffff"
 
1040
        a_entry.text_size = 1
 
1041
        inv.add(a_entry)
 
1042
        inv.revision_id = "expectedid"
 
1043
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1044
        delta = [(None, "A",  "A-id", a_entry)]
 
1045
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
1046
        reference_inv.id_to_entry._ensure_root()
 
1047
        reference_inv.parent_id_basename_to_file_id._ensure_root()
 
1048
        new_inv.id_to_entry._ensure_root()
 
1049
        new_inv.parent_id_basename_to_file_id._ensure_root()
 
1050
        # new_inv should be the same as reference_inv.
 
1051
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
1052
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
1053
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
1054
            new_inv.id_to_entry._root_node._key)
 
1055
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
1056
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
1057
 
 
1058
    def test_iter_changes(self):
 
1059
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
1060
        # InterTree are coming.
 
1061
        inv = Inventory()
 
1062
        inv.revision_id = "revid"
 
1063
        inv.root.revision = "rootrev"
 
1064
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1065
        inv["fileid"].revision = "filerev"
 
1066
        inv["fileid"].executable = True
 
1067
        inv["fileid"].text_sha1 = "ffff"
 
1068
        inv["fileid"].text_size = 1
 
1069
        inv2 = Inventory()
 
1070
        inv2.revision_id = "revid2"
 
1071
        inv2.root.revision = "rootrev"
 
1072
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1073
        inv2["fileid"].revision = "filerev2"
 
1074
        inv2["fileid"].executable = False
 
1075
        inv2["fileid"].text_sha1 = "bbbb"
 
1076
        inv2["fileid"].text_size = 2
 
1077
        # get fresh objects.
 
1078
        chk_bytes = self.get_chk_bytes()
 
1079
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1080
        bytes = ''.join(chk_inv.to_lines())
 
1081
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1082
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
1083
        bytes = ''.join(chk_inv2.to_lines())
 
1084
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
 
1085
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
 
1086
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
1087
            (False, True))],
 
1088
            list(inv_1.iter_changes(inv_2)))
 
1089
 
 
1090
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
1091
        inv = Inventory()
 
1092
        inv.revision_id = "revid"
 
1093
        inv.root.revision = "rootrev"
 
1094
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
1095
        inv["fileid"].revision = "filerev"
 
1096
        inv["fileid"].executable = True
 
1097
        inv["fileid"].text_sha1 = "ffff"
 
1098
        inv["fileid"].text_size = 1
 
1099
        # get fresh objects.
 
1100
        chk_bytes = self.get_chk_bytes()
 
1101
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
1102
        bytes = ''.join(tmp_inv.to_lines())
 
1103
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1104
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
1105
        self.assertEqual(
 
1106
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
 
1107
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))
 
1108
 
 
1109
    def test_file_entry_to_bytes(self):
 
1110
        inv = CHKInventory(None)
 
1111
        ie = inventory.InventoryFile('file-id', 'filename', 'parent-id')
 
1112
        ie.executable = True
 
1113
        ie.revision = 'file-rev-id'
 
1114
        ie.text_sha1 = 'abcdefgh'
 
1115
        ie.text_size = 100
 
1116
        bytes = inv._entry_to_bytes(ie)
 
1117
        self.assertEqual('file: file-id\nparent-id\nfilename\n'
 
1118
                         'file-rev-id\nabcdefgh\n100\nY', bytes)
 
1119
        ie2 = inv._bytes_to_entry(bytes)
 
1120
        self.assertEqual(ie, ie2)
 
1121
        self.assertIsInstance(ie2.name, unicode)
 
1122
        self.assertEqual(('filename', 'file-id', 'file-rev-id'),
 
1123
                         inv._bytes_to_utf8name_key(bytes))
 
1124
 
 
1125
    def test_file2_entry_to_bytes(self):
 
1126
        inv = CHKInventory(None)
 
1127
        # \u30a9 == 'omega'
 
1128
        ie = inventory.InventoryFile('file-id', u'\u03a9name', 'parent-id')
 
1129
        ie.executable = False
 
1130
        ie.revision = 'file-rev-id'
 
1131
        ie.text_sha1 = '123456'
 
1132
        ie.text_size = 25
 
1133
        bytes = inv._entry_to_bytes(ie)
 
1134
        self.assertEqual('file: file-id\nparent-id\n\xce\xa9name\n'
 
1135
                         'file-rev-id\n123456\n25\nN', bytes)
 
1136
        ie2 = inv._bytes_to_entry(bytes)
 
1137
        self.assertEqual(ie, ie2)
 
1138
        self.assertIsInstance(ie2.name, unicode)
 
1139
        self.assertEqual(('\xce\xa9name', 'file-id', 'file-rev-id'),
 
1140
                         inv._bytes_to_utf8name_key(bytes))
 
1141
 
 
1142
    def test_dir_entry_to_bytes(self):
 
1143
        inv = CHKInventory(None)
 
1144
        ie = inventory.InventoryDirectory('dir-id', 'dirname', 'parent-id')
 
1145
        ie.revision = 'dir-rev-id'
 
1146
        bytes = inv._entry_to_bytes(ie)
 
1147
        self.assertEqual('dir: dir-id\nparent-id\ndirname\ndir-rev-id', bytes)
 
1148
        ie2 = inv._bytes_to_entry(bytes)
 
1149
        self.assertEqual(ie, ie2)
 
1150
        self.assertIsInstance(ie2.name, unicode)
 
1151
        self.assertEqual(('dirname', 'dir-id', 'dir-rev-id'),
 
1152
                         inv._bytes_to_utf8name_key(bytes))
 
1153
 
 
1154
    def test_dir2_entry_to_bytes(self):
 
1155
        inv = CHKInventory(None)
 
1156
        ie = inventory.InventoryDirectory('dir-id', u'dir\u03a9name',
 
1157
                                          None)
 
1158
        ie.revision = 'dir-rev-id'
 
1159
        bytes = inv._entry_to_bytes(ie)
 
1160
        self.assertEqual('dir: dir-id\n\ndir\xce\xa9name\n'
 
1161
                         'dir-rev-id', bytes)
 
1162
        ie2 = inv._bytes_to_entry(bytes)
 
1163
        self.assertEqual(ie, ie2)
 
1164
        self.assertIsInstance(ie2.name, unicode)
 
1165
        self.assertIs(ie2.parent_id, None)
 
1166
        self.assertEqual(('dir\xce\xa9name', 'dir-id', 'dir-rev-id'),
 
1167
                         inv._bytes_to_utf8name_key(bytes))
 
1168
 
 
1169
    def test_symlink_entry_to_bytes(self):
 
1170
        inv = CHKInventory(None)
 
1171
        ie = inventory.InventoryLink('link-id', 'linkname', 'parent-id')
 
1172
        ie.revision = 'link-rev-id'
 
1173
        ie.symlink_target = u'target/path'
 
1174
        bytes = inv._entry_to_bytes(ie)
 
1175
        self.assertEqual('symlink: link-id\nparent-id\nlinkname\n'
 
1176
                         'link-rev-id\ntarget/path', bytes)
 
1177
        ie2 = inv._bytes_to_entry(bytes)
 
1178
        self.assertEqual(ie, ie2)
 
1179
        self.assertIsInstance(ie2.name, unicode)
 
1180
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1181
        self.assertEqual(('linkname', 'link-id', 'link-rev-id'),
 
1182
                         inv._bytes_to_utf8name_key(bytes))
 
1183
 
 
1184
    def test_symlink2_entry_to_bytes(self):
 
1185
        inv = CHKInventory(None)
 
1186
        ie = inventory.InventoryLink('link-id', u'link\u03a9name', 'parent-id')
 
1187
        ie.revision = 'link-rev-id'
 
1188
        ie.symlink_target = u'target/\u03a9path'
 
1189
        bytes = inv._entry_to_bytes(ie)
 
1190
        self.assertEqual('symlink: link-id\nparent-id\nlink\xce\xa9name\n'
 
1191
                         'link-rev-id\ntarget/\xce\xa9path', bytes)
 
1192
        ie2 = inv._bytes_to_entry(bytes)
 
1193
        self.assertEqual(ie, ie2)
 
1194
        self.assertIsInstance(ie2.name, unicode)
 
1195
        self.assertIsInstance(ie2.symlink_target, unicode)
 
1196
        self.assertEqual(('link\xce\xa9name', 'link-id', 'link-rev-id'),
 
1197
                         inv._bytes_to_utf8name_key(bytes))
 
1198
 
 
1199
    def test_tree_reference_entry_to_bytes(self):
 
1200
        inv = CHKInventory(None)
 
1201
        ie = inventory.TreeReference('tree-root-id', u'tree\u03a9name',
 
1202
                                     'parent-id')
 
1203
        ie.revision = 'tree-rev-id'
 
1204
        ie.reference_revision = 'ref-rev-id'
 
1205
        bytes = inv._entry_to_bytes(ie)
 
1206
        self.assertEqual('tree: tree-root-id\nparent-id\ntree\xce\xa9name\n'
 
1207
                         'tree-rev-id\nref-rev-id', bytes)
 
1208
        ie2 = inv._bytes_to_entry(bytes)
 
1209
        self.assertEqual(ie, ie2)
 
1210
        self.assertIsInstance(ie2.name, unicode)
 
1211
        self.assertEqual(('tree\xce\xa9name', 'tree-root-id', 'tree-rev-id'),
 
1212
                         inv._bytes_to_utf8name_key(bytes))
 
1213
 
 
1214
 
 
1215
class TestCHKInventoryExpand(tests.TestCaseWithMemoryTransport):
 
1216
 
 
1217
    def get_chk_bytes(self):
 
1218
        factory = groupcompress.make_pack_factory(True, True, 1)
 
1219
        trans = self.get_transport('')
 
1220
        return factory(trans)
 
1221
 
 
1222
    def make_dir(self, inv, name, parent_id):
 
1223
        inv.add(inv.make_entry('directory', name, parent_id, name + '-id'))
 
1224
 
 
1225
    def make_file(self, inv, name, parent_id, content='content\n'):
 
1226
        ie = inv.make_entry('file', name, parent_id, name + '-id')
 
1227
        ie.text_sha1 = osutils.sha_string(content)
 
1228
        ie.text_size = len(content)
 
1229
        inv.add(ie)
 
1230
 
 
1231
    def make_simple_inventory(self):
 
1232
        inv = Inventory('TREE_ROOT')
 
1233
        inv.revision_id = "revid"
 
1234
        inv.root.revision = "rootrev"
 
1235
        # /                 TREE_ROOT
 
1236
        # dir1/             dir1-id
 
1237
        #   sub-file1       sub-file1-id
 
1238
        #   sub-file2       sub-file2-id
 
1239
        #   sub-dir1/       sub-dir1-id
 
1240
        #     subsub-file1  subsub-file1-id
 
1241
        # dir2/             dir2-id
 
1242
        #   sub2-file1      sub2-file1-id
 
1243
        # top               top-id
 
1244
        self.make_dir(inv, 'dir1', 'TREE_ROOT')
 
1245
        self.make_dir(inv, 'dir2', 'TREE_ROOT')
 
1246
        self.make_dir(inv, 'sub-dir1', 'dir1-id')
 
1247
        self.make_file(inv, 'top', 'TREE_ROOT')
 
1248
        self.make_file(inv, 'sub-file1', 'dir1-id')
 
1249
        self.make_file(inv, 'sub-file2', 'dir1-id')
 
1250
        self.make_file(inv, 'subsub-file1', 'sub-dir1-id')
 
1251
        self.make_file(inv, 'sub2-file1', 'dir2-id')
 
1252
        chk_bytes = self.get_chk_bytes()
 
1253
        #  use a small maximum_size to force internal paging structures
 
1254
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
1255
                        maximum_size=100,
 
1256
                        search_key_name='hash-255-way')
 
1257
        bytes = ''.join(chk_inv.to_lines())
 
1258
        return CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
1259
 
 
1260
    def assert_Getitems(self, expected_fileids, inv, file_ids):
 
1261
        self.assertEqual(sorted(expected_fileids),
 
1262
                         sorted([ie.file_id for ie in inv._getitems(file_ids)]))
 
1263
 
 
1264
    def assertExpand(self, all_ids, inv, file_ids):
 
1265
        (val_all_ids,
 
1266
         val_children) = inv._expand_fileids_to_parents_and_children(file_ids)
 
1267
        self.assertEqual(set(all_ids), val_all_ids)
 
1268
        entries = inv._getitems(val_all_ids)
 
1269
        expected_children = {}
 
1270
        for entry in entries:
 
1271
            s = expected_children.setdefault(entry.parent_id, [])
 
1272
            s.append(entry.file_id)
 
1273
        val_children = dict((k, sorted(v)) for k, v
 
1274
                            in val_children.iteritems())
 
1275
        expected_children = dict((k, sorted(v)) for k, v
 
1276
                            in expected_children.iteritems())
 
1277
        self.assertEqual(expected_children, val_children)
 
1278
 
 
1279
    def test_make_simple_inventory(self):
 
1280
        inv = self.make_simple_inventory()
 
1281
        layout = []
 
1282
        for path, entry in inv.iter_entries_by_dir():
 
1283
            layout.append((path, entry.file_id))
 
1284
        self.assertEqual([
 
1285
            ('', 'TREE_ROOT'),
 
1286
            ('dir1', 'dir1-id'),
 
1287
            ('dir2', 'dir2-id'),
 
1288
            ('top', 'top-id'),
 
1289
            ('dir1/sub-dir1', 'sub-dir1-id'),
 
1290
            ('dir1/sub-file1', 'sub-file1-id'),
 
1291
            ('dir1/sub-file2', 'sub-file2-id'),
 
1292
            ('dir1/sub-dir1/subsub-file1', 'subsub-file1-id'),
 
1293
            ('dir2/sub2-file1', 'sub2-file1-id'),
 
1294
            ], layout)
 
1295
 
 
1296
    def test__getitems(self):
 
1297
        inv = self.make_simple_inventory()
 
1298
        # Reading from disk
 
1299
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1300
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1301
        self.assertFalse('sub-file2-id' in inv._fileid_to_entry_cache)
 
1302
        # From cache
 
1303
        self.assert_Getitems(['dir1-id'], inv, ['dir1-id'])
 
1304
        # Mixed
 
1305
        self.assert_Getitems(['dir1-id', 'sub-file2-id'], inv,
 
1306
                             ['dir1-id', 'sub-file2-id'])
 
1307
        self.assertTrue('dir1-id' in inv._fileid_to_entry_cache)
 
1308
        self.assertTrue('sub-file2-id' in inv._fileid_to_entry_cache)
 
1309
 
 
1310
    def test_single_file(self):
 
1311
        inv = self.make_simple_inventory()
 
1312
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1313
 
 
1314
    def test_get_all_parents(self):
 
1315
        inv = self.make_simple_inventory()
 
1316
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1317
                           'subsub-file1-id',
 
1318
                          ], inv, ['subsub-file1-id'])
 
1319
 
 
1320
    def test_get_children(self):
 
1321
        inv = self.make_simple_inventory()
 
1322
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1323
                           'sub-file1-id', 'sub-file2-id', 'subsub-file1-id',
 
1324
                          ], inv, ['dir1-id'])
 
1325
 
 
1326
    def test_from_root(self):
 
1327
        inv = self.make_simple_inventory()
 
1328
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'dir2-id', 'sub-dir1-id',
 
1329
                           'sub-file1-id', 'sub-file2-id', 'sub2-file1-id',
 
1330
                           'subsub-file1-id', 'top-id'], inv, ['TREE_ROOT'])
 
1331
 
 
1332
    def test_top_level_file(self):
 
1333
        inv = self.make_simple_inventory()
 
1334
        self.assertExpand(['TREE_ROOT', 'top-id'], inv, ['top-id'])
 
1335
 
 
1336
    def test_subsub_file(self):
 
1337
        inv = self.make_simple_inventory()
 
1338
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id',
 
1339
                           'subsub-file1-id'], inv, ['subsub-file1-id'])
 
1340
 
 
1341
    def test_sub_and_root(self):
 
1342
        inv = self.make_simple_inventory()
 
1343
        self.assertExpand(['TREE_ROOT', 'dir1-id', 'sub-dir1-id', 'top-id',
 
1344
                           'subsub-file1-id'], inv, ['top-id', 'subsub-file1-id'])