~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/merge_core.py

  • Committer: Martin Pool
  • Date: 2005-08-29 10:57:01 UTC
  • mfrom: (1092.1.41)
  • Revision ID: mbp@sourcefrog.net-20050829105701-7aaa81ecf1bfee05
- merge in merge improvements and additional tests 
  from aaron and lifeless

robertc@robertcollins.net-20050825131100-85772edabc817481

Show diffs side-by-side

added added

removed removed

Lines of Context:
275
275
            os.chmod(filename, other_stat)
276
276
        else:
277
277
            conflict_handler.permission_conflict(filename, base, other)
278
 
 
279
 
 
280
 
import unittest
281
 
import tempfile
282
 
import shutil
283
 
from bzrlib.inventory import InventoryEntry, RootEntry
284
 
from osutils import file_kind
285
 
class FalseTree(object):
286
 
    def __init__(self, realtree):
287
 
        self._realtree = realtree
288
 
        self.inventory = self
289
 
 
290
 
    def __getitem__(self, file_id):
291
 
        entry = self.make_inventory_entry(file_id)
292
 
        if entry is None:
293
 
            raise KeyError(file_id)
294
 
        return entry
295
 
        
296
 
    def make_inventory_entry(self, file_id):
297
 
        path = self._realtree.inventory.get(file_id)
298
 
        if path is None:
299
 
            return None
300
 
        if path == "":
301
 
            return RootEntry(file_id)
302
 
        dir, name = os.path.split(path)
303
 
        kind = file_kind(self._realtree.abs_path(path))
304
 
        for parent_id, path in self._realtree.inventory.iteritems():
305
 
            if path == dir:
306
 
                break
307
 
        if path != dir:
308
 
            raise Exception("Can't find parent for %s" % name)
309
 
        return InventoryEntry(file_id, name, kind, parent_id)
310
 
 
311
 
 
312
 
class MergeTree(object):
313
 
    def __init__(self, dir):
314
 
        self.dir = dir;
315
 
        os.mkdir(dir)
316
 
        self.inventory = {'0': ""}
317
 
        self.tree = FalseTree(self)
318
 
    
319
 
    def child_path(self, parent, name):
320
 
        return os.path.join(self.inventory[parent], name)
321
 
 
322
 
    def add_file(self, id, parent, name, contents, mode):
323
 
        path = self.child_path(parent, name)
324
 
        full_path = self.abs_path(path)
325
 
        assert not os.path.exists(full_path)
326
 
        file(full_path, "wb").write(contents)
327
 
        os.chmod(self.abs_path(path), mode)
328
 
        self.inventory[id] = path
329
 
 
330
 
    def remove_file(self, id):
331
 
        os.unlink(self.full_path(id))
332
 
        del self.inventory[id]
333
 
 
334
 
    def add_dir(self, id, parent, name, mode):
335
 
        path = self.child_path(parent, name)
336
 
        full_path = self.abs_path(path)
337
 
        assert not os.path.exists(full_path)
338
 
        os.mkdir(self.abs_path(path))
339
 
        os.chmod(self.abs_path(path), mode)
340
 
        self.inventory[id] = path
341
 
 
342
 
    def abs_path(self, path):
343
 
        return os.path.join(self.dir, path)
344
 
 
345
 
    def full_path(self, id):
346
 
        try:
347
 
            tree_path = self.inventory[id]
348
 
        except KeyError:
349
 
            return None
350
 
        return self.abs_path(tree_path)
351
 
 
352
 
    def readonly_path(self, id):
353
 
        return self.full_path(id)
354
 
 
355
 
    def __contains__(self, file_id):
356
 
        return file_id in self.inventory
357
 
 
358
 
    def has_or_had_id(self, file_id):
359
 
        return file_id in self
360
 
 
361
 
    def get_file(self, file_id):
362
 
        path = self.readonly_path(file_id)
363
 
        return file(path, "rb")
364
 
 
365
 
    def id2path(self, file_id):
366
 
        return self.inventory[file_id]
367
 
 
368
 
    def change_path(self, id, path):
369
 
        new = os.path.join(self.dir, self.inventory[id])
370
 
        os.rename(self.abs_path(self.inventory[id]), self.abs_path(path))
371
 
        self.inventory[id] = path
372
 
 
373
 
class MergeBuilder(object):
374
 
    def __init__(self):
375
 
        self.dir = tempfile.mkdtemp(prefix="BaZing")
376
 
        self.base = MergeTree(os.path.join(self.dir, "base"))
377
 
        self.this = MergeTree(os.path.join(self.dir, "this"))
378
 
        self.other = MergeTree(os.path.join(self.dir, "other"))
379
 
        
380
 
        self.cset = changeset.Changeset()
381
 
        self.cset.add_entry(changeset.ChangesetEntry("0", 
382
 
                                                     changeset.NULL_ID, "./."))
383
 
    def get_cset_path(self, parent, name):
384
 
        if name is None:
385
 
            assert (parent is None)
386
 
            return None
387
 
        return os.path.join(self.cset.entries[parent].path, name)
388
 
 
389
 
    def add_file(self, id, parent, name, contents, mode):
390
 
        self.base.add_file(id, parent, name, contents, mode)
391
 
        self.this.add_file(id, parent, name, contents, mode)
392
 
        self.other.add_file(id, parent, name, contents, mode)
393
 
        path = self.get_cset_path(parent, name)
394
 
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
395
 
 
396
 
    def remove_file(self, id, base=False, this=False, other=False):
397
 
        for option, tree in ((base, self.base), (this, self.this), 
398
 
                             (other, self.other)):
399
 
            if option:
400
 
                tree.remove_file(id)
401
 
            if other or base:
402
 
                change = self.cset.entries[id].contents_change
403
 
                if change is None:
404
 
                    change = changeset.ReplaceContents(None, None)
405
 
                    self.cset.entries[id].contents_change = change
406
 
                    def create_file(tree):
407
 
                        return changeset.FileCreate(tree.get_file(id).read())
408
 
                    if not other:
409
 
                        change.new_contents = create_file(self.other)
410
 
                    if not base:
411
 
                        change.old_contents = create_file(self.base)
412
 
                else:
413
 
                    assert isinstance(change, changeset.ReplaceContents)
414
 
                if other:
415
 
                    change.new_contents=None
416
 
                if base:
417
 
                    change.old_contents=None
418
 
                if change.old_contents is None and change.new_contents is None:
419
 
                    change = None
420
 
 
421
 
 
422
 
    def add_dir(self, id, parent, name, mode):
423
 
        path = self.get_cset_path(parent, name)
424
 
        self.base.add_dir(id, parent, name, mode)
425
 
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
426
 
        self.this.add_dir(id, parent, name, mode)
427
 
        self.other.add_dir(id, parent, name, mode)
428
 
 
429
 
 
430
 
    def change_name(self, id, base=None, this=None, other=None):
431
 
        if base is not None:
432
 
            self.change_name_tree(id, self.base, base)
433
 
            self.cset.entries[id].name = base
434
 
 
435
 
        if this is not None:
436
 
            self.change_name_tree(id, self.this, this)
437
 
 
438
 
        if other is not None:
439
 
            self.change_name_tree(id, self.other, other)
440
 
            self.cset.entries[id].new_name = other
441
 
 
442
 
    def change_parent(self, id, base=None, this=None, other=None):
443
 
        if base is not None:
444
 
            self.change_parent_tree(id, self.base, base)
445
 
            self.cset.entries[id].parent = base
446
 
            self.cset.entries[id].dir = self.cset.entries[base].path
447
 
 
448
 
        if this is not None:
449
 
            self.change_parent_tree(id, self.this, this)
450
 
 
451
 
        if other is not None:
452
 
            self.change_parent_tree(id, self.other, other)
453
 
            self.cset.entries[id].new_parent = other
454
 
            self.cset.entries[id].new_dir = \
455
 
                self.cset.entries[other].new_path
456
 
 
457
 
    def change_contents(self, id, base=None, this=None, other=None):
458
 
        if base is not None:
459
 
            self.change_contents_tree(id, self.base, base)
460
 
 
461
 
        if this is not None:
462
 
            self.change_contents_tree(id, self.this, this)
463
 
 
464
 
        if other is not None:
465
 
            self.change_contents_tree(id, self.other, other)
466
 
 
467
 
        if base is not None or other is not None:
468
 
            old_contents = file(self.base.full_path(id)).read()
469
 
            new_contents = file(self.other.full_path(id)).read()
470
 
            contents = changeset.ReplaceFileContents(old_contents, 
471
 
                                                     new_contents)
472
 
            self.cset.entries[id].contents_change = contents
473
 
 
474
 
    def change_perms(self, id, base=None, this=None, other=None):
475
 
        if base is not None:
476
 
            self.change_perms_tree(id, self.base, base)
477
 
 
478
 
        if this is not None:
479
 
            self.change_perms_tree(id, self.this, this)
480
 
 
481
 
        if other is not None:
482
 
            self.change_perms_tree(id, self.other, other)
483
 
 
484
 
        if base is not None or other is not None:
485
 
            old_perms = os.stat(self.base.full_path(id)).st_mode &077
486
 
            new_perms = os.stat(self.other.full_path(id)).st_mode &077
487
 
            contents = changeset.ChangeUnixPermissions(old_perms, 
488
 
                                                       new_perms)
489
 
            self.cset.entries[id].metadata_change = contents
490
 
 
491
 
    def change_name_tree(self, id, tree, name):
492
 
        new_path = tree.child_path(self.cset.entries[id].parent, name)
493
 
        tree.change_path(id, new_path)
494
 
 
495
 
    def change_parent_tree(self, id, tree, parent):
496
 
        new_path = tree.child_path(parent, self.cset.entries[id].name)
497
 
        tree.change_path(id, new_path)
498
 
 
499
 
    def change_contents_tree(self, id, tree, contents):
500
 
        path = tree.full_path(id)
501
 
        mode = os.stat(path).st_mode
502
 
        file(path, "w").write(contents)
503
 
        os.chmod(path, mode)
504
 
 
505
 
    def change_perms_tree(self, id, tree, mode):
506
 
        os.chmod(tree.full_path(id), mode)
507
 
 
508
 
    def merge_changeset(self, merge_factory):
509
 
        conflict_handler = changeset.ExceptionConflictHandler(self.this.dir)
510
 
        return make_merge_changeset(self.cset, self.this, self.base,
511
 
                                    self.other, conflict_handler,
512
 
                                    merge_factory)
513
 
 
514
 
    def apply_inv_change(self, inventory_change, orig_inventory):
515
 
        orig_inventory_by_path = {}
516
 
        for file_id, path in orig_inventory.iteritems():
517
 
            orig_inventory_by_path[path] = file_id
518
 
 
519
 
        def parent_id(file_id):
520
 
            try:
521
 
                parent_dir = os.path.dirname(orig_inventory[file_id])
522
 
            except:
523
 
                print file_id
524
 
                raise
525
 
            if parent_dir == "":
526
 
                return None
527
 
            return orig_inventory_by_path[parent_dir]
528
 
        
529
 
        def new_path(file_id):
530
 
            if inventory_change.has_key(file_id):
531
 
                return inventory_change[file_id]
532
 
            else:
533
 
                parent = parent_id(file_id)
534
 
                if parent is None:
535
 
                    return orig_inventory[file_id]
536
 
                dirname = new_path(parent)
537
 
                return os.path.join(dirname, orig_inventory[file_id])
538
 
 
539
 
        new_inventory = {}
540
 
        for file_id in orig_inventory.iterkeys():
541
 
            path = new_path(file_id)
542
 
            if path is None:
543
 
                continue
544
 
            new_inventory[file_id] = path
545
 
 
546
 
        for file_id, path in inventory_change.iteritems():
547
 
            if orig_inventory.has_key(file_id):
548
 
                continue
549
 
            new_inventory[file_id] = path
550
 
        return new_inventory
551
 
 
552
 
        
553
 
 
554
 
    def apply_changeset(self, cset, conflict_handler=None, reverse=False):
555
 
        inventory_change = changeset.apply_changeset(cset,
556
 
                                                     self.this.inventory,
557
 
                                                     self.this.dir,
558
 
                                                     conflict_handler, reverse)
559
 
        self.this.inventory =  self.apply_inv_change(inventory_change, 
560
 
                                                     self.this.inventory)
561
 
 
562
 
                    
563
 
        
564
 
 
565
 
        
566
 
    def cleanup(self):
567
 
        shutil.rmtree(self.dir)
568
 
 
569
 
 
570
 
class MergeTest(unittest.TestCase):
571
 
    def test_change_name(self):
572
 
        """Test renames"""
573
 
        builder = MergeBuilder()
574
 
        builder.add_file("1", "0", "name1", "hello1", 0755)
575
 
        builder.change_name("1", other="name2")
576
 
        builder.add_file("2", "0", "name3", "hello2", 0755)
577
 
        builder.change_name("2", base="name4")
578
 
        builder.add_file("3", "0", "name5", "hello3", 0755)
579
 
        builder.change_name("3", this="name6")
580
 
        cset = builder.merge_changeset(ApplyMerge3)
581
 
        assert(cset.entries["2"].is_boring())
582
 
        assert(cset.entries["1"].name == "name1")
583
 
        assert(cset.entries["1"].new_name == "name2")
584
 
        assert(cset.entries["3"].is_boring())
585
 
        for tree in (builder.this, builder.other, builder.base):
586
 
            assert(tree.dir != builder.dir and 
587
 
                   tree.dir.startswith(builder.dir))
588
 
            for path in tree.inventory.itervalues():
589
 
                fullpath = tree.abs_path(path)
590
 
                assert(fullpath.startswith(tree.dir))
591
 
                assert(not path.startswith(tree.dir))
592
 
                assert os.path.exists(fullpath)
593
 
        builder.apply_changeset(cset)
594
 
        builder.cleanup()
595
 
        builder = MergeBuilder()
596
 
        builder.add_file("1", "0", "name1", "hello1", 0644)
597
 
        builder.change_name("1", other="name2", this="name3")
598
 
        self.assertRaises(changeset.RenameConflict, 
599
 
                          builder.merge_changeset, ApplyMerge3)
600
 
        builder.cleanup()
601
 
        
602
 
    def test_file_moves(self):
603
 
        """Test moves"""
604
 
        builder = MergeBuilder()
605
 
        builder.add_dir("1", "0", "dir1", 0755)
606
 
        builder.add_dir("2", "0", "dir2", 0755)
607
 
        builder.add_file("3", "1", "file1", "hello1", 0644)
608
 
        builder.add_file("4", "1", "file2", "hello2", 0644)
609
 
        builder.add_file("5", "1", "file3", "hello3", 0644)
610
 
        builder.change_parent("3", other="2")
611
 
        assert(Inventory(builder.other.inventory).get_parent("3") == "2")
612
 
        builder.change_parent("4", this="2")
613
 
        assert(Inventory(builder.this.inventory).get_parent("4") == "2")
614
 
        builder.change_parent("5", base="2")
615
 
        assert(Inventory(builder.base.inventory).get_parent("5") == "2")
616
 
        cset = builder.merge_changeset(ApplyMerge3)
617
 
        for id in ("1", "2", "4", "5"):
618
 
            assert(cset.entries[id].is_boring())
619
 
        assert(cset.entries["3"].parent == "1")
620
 
        assert(cset.entries["3"].new_parent == "2")
621
 
        builder.apply_changeset(cset)
622
 
        builder.cleanup()
623
 
 
624
 
        builder = MergeBuilder()
625
 
        builder.add_dir("1", "0", "dir1", 0755)
626
 
        builder.add_dir("2", "0", "dir2", 0755)
627
 
        builder.add_dir("3", "0", "dir3", 0755)
628
 
        builder.add_file("4", "1", "file1", "hello1", 0644)
629
 
        builder.change_parent("4", other="2", this="3")
630
 
        self.assertRaises(changeset.MoveConflict, 
631
 
                          builder.merge_changeset, ApplyMerge3)
632
 
        builder.cleanup()
633
 
 
634
 
    def test_contents_merge(self):
635
 
        """Test merge3 merging"""
636
 
        self.do_contents_test(ApplyMerge3)
637
 
 
638
 
    def test_contents_merge2(self):
639
 
        """Test diff3 merging"""
640
 
        self.do_contents_test(changeset.Diff3Merge)
641
 
 
642
 
    def test_contents_merge3(self):
643
 
        """Test diff3 merging"""
644
 
        def backup_merge(file_id, base, other):
645
 
            return BackupBeforeChange(ApplyMerge3(file_id, base, other))
646
 
        builder = self.contents_test_success(backup_merge)
647
 
        def backup_exists(file_id):
648
 
            return os.path.exists(builder.this.full_path(file_id)+"~")
649
 
        assert backup_exists("1")
650
 
        assert backup_exists("2")
651
 
        assert not backup_exists("3")
652
 
        builder.cleanup()
653
 
 
654
 
    def do_contents_test(self, merge_factory):
655
 
        """Test merging with specified ContentsChange factory"""
656
 
        builder = self.contents_test_success(merge_factory)
657
 
        builder.cleanup()
658
 
        self.contents_test_conflicts(merge_factory)
659
 
 
660
 
    def contents_test_success(self, merge_factory):
661
 
        from inspect import isclass
662
 
        builder = MergeBuilder()
663
 
        builder.add_file("1", "0", "name1", "text1", 0755)
664
 
        builder.change_contents("1", other="text4")
665
 
        builder.add_file("2", "0", "name3", "text2", 0655)
666
 
        builder.change_contents("2", base="text5")
667
 
        builder.add_file("3", "0", "name5", "text3", 0744)
668
 
        builder.add_file("4", "0", "name6", "text4", 0744)
669
 
        builder.remove_file("4", base=True)
670
 
        assert not builder.cset.entries["4"].is_boring()
671
 
        builder.change_contents("3", this="text6")
672
 
        cset = builder.merge_changeset(merge_factory)
673
 
        assert(cset.entries["1"].contents_change is not None)
674
 
        if isclass(merge_factory):
675
 
            assert(isinstance(cset.entries["1"].contents_change,
676
 
                          merge_factory))
677
 
            assert(isinstance(cset.entries["2"].contents_change,
678
 
                          merge_factory))
679
 
        assert(cset.entries["3"].is_boring())
680
 
        assert(cset.entries["4"].is_boring())
681
 
        builder.apply_changeset(cset)
682
 
        assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
683
 
        assert(file(builder.this.full_path("2"), "rb").read() == "text2" )
684
 
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
685
 
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
686
 
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
687
 
        return builder
688
 
 
689
 
    def contents_test_conflicts(self, merge_factory):
690
 
        builder = MergeBuilder()
691
 
        builder.add_file("1", "0", "name1", "text1", 0755)
692
 
        builder.change_contents("1", other="text4", this="text3")
693
 
        cset = builder.merge_changeset(merge_factory)
694
 
        self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
695
 
                          cset)
696
 
        builder.cleanup()
697
 
 
698
 
        builder = MergeBuilder()
699
 
        builder.add_file("1", "0", "name1", "text1", 0755)
700
 
        builder.change_contents("1", other="text4", base="text3")
701
 
        builder.remove_file("1", base=True)
702
 
        self.assertRaises(changeset.NewContentsConflict,
703
 
                          builder.merge_changeset, merge_factory)
704
 
        builder.cleanup()
705
 
 
706
 
        builder = MergeBuilder()
707
 
        builder.add_file("1", "0", "name1", "text1", 0755)
708
 
        builder.change_contents("1", other="text4", base="text3")
709
 
        builder.remove_file("1", this=True)
710
 
        self.assertRaises(changeset.MissingForMerge, builder.merge_changeset, 
711
 
                          merge_factory)
712
 
        builder.cleanup()
713
 
 
714
 
    def test_perms_merge(self):
715
 
        builder = MergeBuilder()
716
 
        builder.add_file("1", "0", "name1", "text1", 0755)
717
 
        builder.change_perms("1", other=0655)
718
 
        builder.add_file("2", "0", "name2", "text2", 0755)
719
 
        builder.change_perms("2", base=0655)
720
 
        builder.add_file("3", "0", "name3", "text3", 0755)
721
 
        builder.change_perms("3", this=0655)
722
 
        cset = builder.merge_changeset(ApplyMerge3)
723
 
        assert(cset.entries["1"].metadata_change is not None)
724
 
        assert(isinstance(cset.entries["1"].metadata_change,
725
 
                          PermissionsMerge))
726
 
        assert(isinstance(cset.entries["2"].metadata_change,
727
 
                          PermissionsMerge))
728
 
        assert(cset.entries["3"].is_boring())
729
 
        builder.apply_changeset(cset)
730
 
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0655)
731
 
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0755)
732
 
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0655)
733
 
        builder.cleanup();
734
 
        builder = MergeBuilder()
735
 
        builder.add_file("1", "0", "name1", "text1", 0755)
736
 
        builder.change_perms("1", other=0655, base=0555)
737
 
        cset = builder.merge_changeset(ApplyMerge3)
738
 
        self.assertRaises(changeset.MergePermissionConflict, 
739
 
                     builder.apply_changeset, cset)
740
 
        builder.cleanup()
741
 
 
742
 
def test():        
743
 
    changeset_suite = unittest.makeSuite(MergeTest, 'test_')
744
 
    runner = unittest.TextTestRunner()
745
 
    runner.run(changeset_suite)
746
 
        
747
 
if __name__ == "__main__":
748
 
    test()