~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_inv.py

create thread for bbc

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
 
18
 
from bzrlib import errors, inventory, osutils
19
 
from bzrlib.inventory import (Inventory, ROOT_ID, InventoryFile,
 
18
from bzrlib import errors, chk_map, inventory, osutils
 
19
from bzrlib.inventory import (CHKInventory, Inventory, ROOT_ID, InventoryFile,
20
20
    InventoryDirectory, InventoryEntry, TreeReference)
21
 
from bzrlib.tests import TestCase
 
21
from bzrlib.tests import TestCase, TestCaseWithTransport
22
22
 
23
23
 
24
24
class TestInventoryEntry(TestCase):
166
166
    def assertChangeDescription(self, expected_change, old_ie, new_ie):
167
167
        change = InventoryEntry.describe_change(old_ie, new_ie)
168
168
        self.assertEqual(expected_change, change)
 
169
 
 
170
 
 
171
class TestCHKInventory(TestCaseWithTransport):
 
172
 
 
173
    def get_chk_bytes(self):
 
174
        # The easiest way to get a CHK store is a development5 repository and
 
175
        # then work with the chk_bytes attribute directly.
 
176
        repo = self.make_repository(".", format="development5")
 
177
        repo.lock_write()
 
178
        self.addCleanup(repo.unlock)
 
179
        repo.start_write_group()
 
180
        self.addCleanup(repo.abort_write_group)
 
181
        return repo.chk_bytes
 
182
 
 
183
    def read_bytes(self, chk_bytes, key):
 
184
        stream = chk_bytes.get_record_stream([key], 'unordered', True)
 
185
        return stream.next().get_bytes_as("fulltext")
 
186
 
 
187
    def test_deserialise_gives_CHKInventory(self):
 
188
        inv = Inventory()
 
189
        inv.revision_id = "revid"
 
190
        inv.root.revision = "rootrev"
 
191
        chk_bytes = self.get_chk_bytes()
 
192
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
193
        bytes = ''.join(chk_inv.to_lines())
 
194
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
195
        self.assertEqual("revid", new_inv.revision_id)
 
196
        self.assertEqual("directory", new_inv.root.kind)
 
197
        self.assertEqual(inv.root.file_id, new_inv.root.file_id)
 
198
        self.assertEqual(inv.root.parent_id, new_inv.root.parent_id)
 
199
        self.assertEqual(inv.root.name, new_inv.root.name)
 
200
        self.assertEqual("rootrev", new_inv.root.revision)
 
201
        self.assertEqual('plain', new_inv._search_key_name)
 
202
 
 
203
    def test_deserialise_wrong_revid(self):
 
204
        inv = Inventory()
 
205
        inv.revision_id = "revid"
 
206
        inv.root.revision = "rootrev"
 
207
        chk_bytes = self.get_chk_bytes()
 
208
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
209
        bytes = ''.join(chk_inv.to_lines())
 
210
        self.assertRaises(ValueError, CHKInventory.deserialise, chk_bytes,
 
211
            bytes, ("revid2",))
 
212
 
 
213
    def test_captures_rev_root_byid(self):
 
214
        inv = Inventory()
 
215
        inv.revision_id = "foo"
 
216
        inv.root.revision = "bar"
 
217
        chk_bytes = self.get_chk_bytes()
 
218
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
219
        lines = chk_inv.to_lines()
 
220
        self.assertEqual([
 
221
            'chkinventory:\n',
 
222
            'revision_id: foo\n',
 
223
            'root_id: TREE_ROOT\n',
 
224
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
225
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
226
            ], lines)
 
227
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
228
        self.assertEqual('plain', chk_inv._search_key_name)
 
229
 
 
230
    def test_captures_parent_id_basename_index(self):
 
231
        inv = Inventory()
 
232
        inv.revision_id = "foo"
 
233
        inv.root.revision = "bar"
 
234
        chk_bytes = self.get_chk_bytes()
 
235
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
236
        lines = chk_inv.to_lines()
 
237
        self.assertEqual([
 
238
            'chkinventory:\n',
 
239
            'revision_id: foo\n',
 
240
            'root_id: TREE_ROOT\n',
 
241
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
242
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
243
            ], lines)
 
244
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
245
        self.assertEqual('plain', chk_inv._search_key_name)
 
246
 
 
247
    def test_captures_search_key_name(self):
 
248
        inv = Inventory()
 
249
        inv.revision_id = "foo"
 
250
        inv.root.revision = "bar"
 
251
        chk_bytes = self.get_chk_bytes()
 
252
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv,
 
253
                                              search_key_name='hash-16-way')
 
254
        lines = chk_inv.to_lines()
 
255
        self.assertEqual([
 
256
            'chkinventory:\n',
 
257
            'search_key_name: hash-16-way\n',
 
258
            'root_id: TREE_ROOT\n',
 
259
            'parent_id_basename_to_file_id: sha1:eb23f0ad4b07f48e88c76d4c94292be57fb2785f\n',
 
260
            'revision_id: foo\n',
 
261
            'id_to_entry: sha1:debfe920f1f10e7929260f0534ac9a24d7aabbb4\n',
 
262
            ], lines)
 
263
        chk_inv = CHKInventory.deserialise(chk_bytes, ''.join(lines), ('foo',))
 
264
        self.assertEqual('hash-16-way', chk_inv._search_key_name)
 
265
 
 
266
    def test_directory_children_on_demand(self):
 
267
        inv = Inventory()
 
268
        inv.revision_id = "revid"
 
269
        inv.root.revision = "rootrev"
 
270
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
271
        inv["fileid"].revision = "filerev"
 
272
        inv["fileid"].executable = True
 
273
        inv["fileid"].text_sha1 = "ffff"
 
274
        inv["fileid"].text_size = 1
 
275
        chk_bytes = self.get_chk_bytes()
 
276
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
277
        bytes = ''.join(chk_inv.to_lines())
 
278
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
279
        root_entry = new_inv[inv.root.file_id]
 
280
        self.assertEqual(None, root_entry._children)
 
281
        self.assertEqual(['file'], root_entry.children.keys())
 
282
        file_direct = new_inv["fileid"]
 
283
        file_found = root_entry.children['file']
 
284
        self.assertEqual(file_direct.kind, file_found.kind)
 
285
        self.assertEqual(file_direct.file_id, file_found.file_id)
 
286
        self.assertEqual(file_direct.parent_id, file_found.parent_id)
 
287
        self.assertEqual(file_direct.name, file_found.name)
 
288
        self.assertEqual(file_direct.revision, file_found.revision)
 
289
        self.assertEqual(file_direct.text_sha1, file_found.text_sha1)
 
290
        self.assertEqual(file_direct.text_size, file_found.text_size)
 
291
        self.assertEqual(file_direct.executable, file_found.executable)
 
292
 
 
293
    def test_from_inventory_maximum_size(self):
 
294
        # from_inventory supports the maximum_size parameter.
 
295
        inv = Inventory()
 
296
        inv.revision_id = "revid"
 
297
        inv.root.revision = "rootrev"
 
298
        chk_bytes = self.get_chk_bytes()
 
299
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv, 120)
 
300
        self.assertEqual(120, chk_inv.id_to_entry._root_node.maximum_size)
 
301
 
 
302
    def test___iter__(self):
 
303
        inv = Inventory()
 
304
        inv.revision_id = "revid"
 
305
        inv.root.revision = "rootrev"
 
306
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
307
        inv["fileid"].revision = "filerev"
 
308
        inv["fileid"].executable = True
 
309
        inv["fileid"].text_sha1 = "ffff"
 
310
        inv["fileid"].text_size = 1
 
311
        chk_bytes = self.get_chk_bytes()
 
312
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
313
        bytes = ''.join(chk_inv.to_lines())
 
314
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
315
        fileids = list(new_inv.__iter__())
 
316
        fileids.sort()
 
317
        self.assertEqual([inv.root.file_id, "fileid"], fileids)
 
318
 
 
319
    def test__len__(self):
 
320
        inv = Inventory()
 
321
        inv.revision_id = "revid"
 
322
        inv.root.revision = "rootrev"
 
323
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
324
        inv["fileid"].revision = "filerev"
 
325
        inv["fileid"].executable = True
 
326
        inv["fileid"].text_sha1 = "ffff"
 
327
        inv["fileid"].text_size = 1
 
328
        chk_bytes = self.get_chk_bytes()
 
329
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
330
        self.assertEqual(2, len(chk_inv))
 
331
 
 
332
    def test___getitem__(self):
 
333
        inv = Inventory()
 
334
        inv.revision_id = "revid"
 
335
        inv.root.revision = "rootrev"
 
336
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
337
        inv["fileid"].revision = "filerev"
 
338
        inv["fileid"].executable = True
 
339
        inv["fileid"].text_sha1 = "ffff"
 
340
        inv["fileid"].text_size = 1
 
341
        chk_bytes = self.get_chk_bytes()
 
342
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
343
        bytes = ''.join(chk_inv.to_lines())
 
344
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
345
        root_entry = new_inv[inv.root.file_id]
 
346
        file_entry = new_inv["fileid"]
 
347
        self.assertEqual("directory", root_entry.kind)
 
348
        self.assertEqual(inv.root.file_id, root_entry.file_id)
 
349
        self.assertEqual(inv.root.parent_id, root_entry.parent_id)
 
350
        self.assertEqual(inv.root.name, root_entry.name)
 
351
        self.assertEqual("rootrev", root_entry.revision)
 
352
        self.assertEqual("file", file_entry.kind)
 
353
        self.assertEqual("fileid", file_entry.file_id)
 
354
        self.assertEqual(inv.root.file_id, file_entry.parent_id)
 
355
        self.assertEqual("file", file_entry.name)
 
356
        self.assertEqual("filerev", file_entry.revision)
 
357
        self.assertEqual("ffff", file_entry.text_sha1)
 
358
        self.assertEqual(1, file_entry.text_size)
 
359
        self.assertEqual(True, file_entry.executable)
 
360
        self.assertRaises(errors.NoSuchId, new_inv.__getitem__, 'missing')
 
361
 
 
362
    def test_has_id_true(self):
 
363
        inv = Inventory()
 
364
        inv.revision_id = "revid"
 
365
        inv.root.revision = "rootrev"
 
366
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
367
        inv["fileid"].revision = "filerev"
 
368
        inv["fileid"].executable = True
 
369
        inv["fileid"].text_sha1 = "ffff"
 
370
        inv["fileid"].text_size = 1
 
371
        chk_bytes = self.get_chk_bytes()
 
372
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
373
        self.assertTrue(chk_inv.has_id('fileid'))
 
374
        self.assertTrue(chk_inv.has_id(inv.root.file_id))
 
375
 
 
376
    def test_has_id_not(self):
 
377
        inv = Inventory()
 
378
        inv.revision_id = "revid"
 
379
        inv.root.revision = "rootrev"
 
380
        chk_bytes = self.get_chk_bytes()
 
381
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
382
        self.assertFalse(chk_inv.has_id('fileid'))
 
383
 
 
384
    def test_id2path(self):
 
385
        inv = Inventory()
 
386
        inv.revision_id = "revid"
 
387
        inv.root.revision = "rootrev"
 
388
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
389
        fileentry = InventoryFile("fileid", "file", "dirid")
 
390
        inv.add(direntry)
 
391
        inv.add(fileentry)
 
392
        inv["fileid"].revision = "filerev"
 
393
        inv["fileid"].executable = True
 
394
        inv["fileid"].text_sha1 = "ffff"
 
395
        inv["fileid"].text_size = 1
 
396
        inv["dirid"].revision = "filerev"
 
397
        chk_bytes = self.get_chk_bytes()
 
398
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
399
        bytes = ''.join(chk_inv.to_lines())
 
400
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
401
        self.assertEqual('', new_inv.id2path(inv.root.file_id))
 
402
        self.assertEqual('dir', new_inv.id2path('dirid'))
 
403
        self.assertEqual('dir/file', new_inv.id2path('fileid'))
 
404
 
 
405
    def test_path2id(self):
 
406
        inv = Inventory()
 
407
        inv.revision_id = "revid"
 
408
        inv.root.revision = "rootrev"
 
409
        direntry = InventoryDirectory("dirid", "dir", inv.root.file_id)
 
410
        fileentry = InventoryFile("fileid", "file", "dirid")
 
411
        inv.add(direntry)
 
412
        inv.add(fileentry)
 
413
        inv["fileid"].revision = "filerev"
 
414
        inv["fileid"].executable = True
 
415
        inv["fileid"].text_sha1 = "ffff"
 
416
        inv["fileid"].text_size = 1
 
417
        inv["dirid"].revision = "filerev"
 
418
        chk_bytes = self.get_chk_bytes()
 
419
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
420
        bytes = ''.join(chk_inv.to_lines())
 
421
        new_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
422
        self.assertEqual(inv.root.file_id, new_inv.path2id(''))
 
423
        self.assertEqual('dirid', new_inv.path2id('dir'))
 
424
        self.assertEqual('fileid', new_inv.path2id('dir/file'))
 
425
 
 
426
    def test_create_by_apply_delta_sets_root(self):
 
427
        inv = Inventory()
 
428
        inv.revision_id = "revid"
 
429
        chk_bytes = self.get_chk_bytes()
 
430
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
431
        inv.add_path("", "directory", "myrootid", None)
 
432
        inv.revision_id = "expectedid"
 
433
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
434
        delta = [(None, "",  "myrootid", inv.root)]
 
435
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
436
        self.assertEquals(reference_inv.root, new_inv.root)
 
437
 
 
438
    def test_create_by_apply_delta_empty_add_child(self):
 
439
        inv = Inventory()
 
440
        inv.revision_id = "revid"
 
441
        inv.root.revision = "rootrev"
 
442
        chk_bytes = self.get_chk_bytes()
 
443
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
444
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
445
        a_entry.revision = "filerev"
 
446
        a_entry.executable = True
 
447
        a_entry.text_sha1 = "ffff"
 
448
        a_entry.text_size = 1
 
449
        inv.add(a_entry)
 
450
        inv.revision_id = "expectedid"
 
451
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
452
        delta = [(None, "A",  "A-id", a_entry)]
 
453
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
454
        # new_inv should be the same as reference_inv.
 
455
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
456
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
457
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
458
            new_inv.id_to_entry._root_node._key)
 
459
 
 
460
    def test_create_by_apply_delta_empty_add_child_updates_parent_id(self):
 
461
        inv = Inventory()
 
462
        inv.revision_id = "revid"
 
463
        inv.root.revision = "rootrev"
 
464
        chk_bytes = self.get_chk_bytes()
 
465
        base_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
466
        a_entry = InventoryFile("A-id", "A", inv.root.file_id)
 
467
        a_entry.revision = "filerev"
 
468
        a_entry.executable = True
 
469
        a_entry.text_sha1 = "ffff"
 
470
        a_entry.text_size = 1
 
471
        inv.add(a_entry)
 
472
        inv.revision_id = "expectedid"
 
473
        reference_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
474
        delta = [(None, "A",  "A-id", a_entry)]
 
475
        new_inv = base_inv.create_by_apply_delta(delta, "expectedid")
 
476
        # new_inv should be the same as reference_inv.
 
477
        self.assertEqual(reference_inv.revision_id, new_inv.revision_id)
 
478
        self.assertEqual(reference_inv.root_id, new_inv.root_id)
 
479
        self.assertEqual(reference_inv.id_to_entry._root_node._key,
 
480
            new_inv.id_to_entry._root_node._key)
 
481
        self.assertEqual(reference_inv.parent_id_basename_to_file_id._root_node._key,
 
482
            new_inv.parent_id_basename_to_file_id._root_node._key)
 
483
 
 
484
    def test_iter_changes(self):
 
485
        # Low level bootstrapping smoke test; comprehensive generic tests via
 
486
        # InterTree are coming.
 
487
        inv = Inventory()
 
488
        inv.revision_id = "revid"
 
489
        inv.root.revision = "rootrev"
 
490
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
491
        inv["fileid"].revision = "filerev"
 
492
        inv["fileid"].executable = True
 
493
        inv["fileid"].text_sha1 = "ffff"
 
494
        inv["fileid"].text_size = 1
 
495
        inv2 = Inventory()
 
496
        inv2.revision_id = "revid2"
 
497
        inv2.root.revision = "rootrev"
 
498
        inv2.add(InventoryFile("fileid", "file", inv.root.file_id))
 
499
        inv2["fileid"].revision = "filerev2"
 
500
        inv2["fileid"].executable = False
 
501
        inv2["fileid"].text_sha1 = "bbbb"
 
502
        inv2["fileid"].text_size = 2
 
503
        # get fresh objects.
 
504
        chk_bytes = self.get_chk_bytes()
 
505
        chk_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
506
        bytes = ''.join(chk_inv.to_lines())
 
507
        inv_1 = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
508
        chk_inv2 = CHKInventory.from_inventory(chk_bytes, inv2)
 
509
        bytes = ''.join(chk_inv2.to_lines())
 
510
        inv_2 = CHKInventory.deserialise(chk_bytes, bytes, ("revid2",))
 
511
        self.assertEqual([('fileid', (u'file', u'file'), True, (True, True),
 
512
            ('TREE_ROOT', 'TREE_ROOT'), (u'file', u'file'), ('file', 'file'),
 
513
            (False, True))],
 
514
            list(inv_1.iter_changes(inv_2)))
 
515
 
 
516
    def test_parent_id_basename_to_file_id_index_enabled(self):
 
517
        inv = Inventory()
 
518
        inv.revision_id = "revid"
 
519
        inv.root.revision = "rootrev"
 
520
        inv.add(InventoryFile("fileid", "file", inv.root.file_id))
 
521
        inv["fileid"].revision = "filerev"
 
522
        inv["fileid"].executable = True
 
523
        inv["fileid"].text_sha1 = "ffff"
 
524
        inv["fileid"].text_size = 1
 
525
        # get fresh objects.
 
526
        chk_bytes = self.get_chk_bytes()
 
527
        tmp_inv = CHKInventory.from_inventory(chk_bytes, inv)
 
528
        bytes = ''.join(tmp_inv.to_lines())
 
529
        chk_inv = CHKInventory.deserialise(chk_bytes, bytes, ("revid",))
 
530
        self.assertIsInstance(chk_inv.parent_id_basename_to_file_id, chk_map.CHKMap)
 
531
        self.assertEqual(
 
532
            {('', ''): 'TREE_ROOT', ('TREE_ROOT', 'file'): 'fileid'},
 
533
            dict(chk_inv.parent_id_basename_to_file_id.iteritems()))