2
2
from changeset import Inventory, apply_changeset, invert_dict
4
from osutils import backup_file
5
from merge3 import Merge3
8
"""Contents-change wrapper around merge3.Merge3"""
9
def __init__(self, base_file, other_file):
10
self.base_file = base_file
11
self.other_file = other_file
13
def __eq__(self, other):
14
if not isinstance(other, ApplyMerge3):
16
return (self.base_file == other.base_file and
17
self.other_file == other.other_file)
19
def __ne__(self, other):
20
return not (self == other)
23
def apply(self, filename, conflict_handler, reverse=False):
24
new_file = filename+".new"
27
other = self.other_file
29
base = self.other_file
30
other = self.base_file
31
m3 = Merge3(file(base, "rb").readlines(),
32
file(filename, "rb").readlines(),
33
file(other, "rb").readlines())
36
output_file = file(new_file, "wb")
37
start_marker = "!START OF MERGE CONFLICT!" + "I HOPE THIS IS UNIQUE"
38
for line in m3.merge_lines(name_a = "TREE", name_b = "MERGE-SOURCE",
39
start_marker=start_marker):
40
if line.startswith(start_marker):
42
output_file.write(line.replace(start_marker, '<<<<<<<<'))
44
output_file.write(line)
47
os.chmod(new_file, os.stat(filename).st_mode)
48
os.rename(new_file, filename)
51
conflict_handler.merge_conflict(new_file, filename, base, other)
54
class BackupBeforeChange:
55
"""Contents-change wrapper to back up file first"""
56
def __init__(self, contents_change):
57
self.contents_change = contents_change
59
def __eq__(self, other):
60
if not isinstance(other, BackupBeforeChange):
62
return (self.contents_change == other.contents_change)
64
def __ne__(self, other):
65
return not (self == other)
67
def apply(self, filename, conflict_handler, reverse=False):
69
self.contents_change.apply(filename, conflict_handler, reverse)
5
72
class ThreewayInventory(object):
6
73
def __init__(self, this_inventory, base_inventory, other_inventory):
20
87
def merge_flex(this, base, other, changeset_function, inventory_function,
88
conflict_handler, merge_factory):
22
89
this_inventory = inventory_function(this)
23
90
base_inventory = inventory_function(base)
24
91
other_inventory = inventory_function(other)
27
94
make_inv(other_inventory))
28
95
cset = changeset_function(base, other, base_inventory, other_inventory)
29
96
new_cset = make_merge_changeset(cset, inventory, this, base, other,
97
conflict_handler, merge_factory)
31
98
result = apply_changeset(new_cset, invert_invent(this_inventory),
32
99
this.root, conflict_handler, False)
33
100
conflict_handler.finalize()
38
105
def make_merge_changeset(cset, inventory, this, base, other,
39
conflict_handler=None):
106
conflict_handler, merge_factory):
40
107
new_cset = changeset.Changeset()
41
108
def get_this_contents(id):
42
109
path = os.path.join(this.root, inventory.this.get_path(id))
52
119
new_entry = make_merged_entry(entry, inventory, conflict_handler)
53
120
new_contents = make_merged_contents(entry, this, base, other,
54
inventory, conflict_handler)
121
inventory, conflict_handler,
55
123
new_entry.contents_change = new_contents
56
124
new_entry.metadata_change = make_merged_metadata(entry, base, other)
57
125
new_cset.add_entry(new_entry)
119
def make_merged_contents(entry, this, base, other, inventory, conflict_handler):
187
def make_merged_contents(entry, this, base, other, inventory, conflict_handler,
120
189
contents = entry.contents_change
121
190
if contents is None:
123
192
this_path = this.readonly_path(entry.id)
125
194
if this_path is None:
126
195
return conflict_handler.missing_for_merge(entry.id, inventory)
127
196
base_path = base.readonly_path(entry.id)
128
197
other_path = other.readonly_path(entry.id)
129
return changeset.Diff3Merge(base_path, other_path)
198
return merge_factory(base_path, other_path)
131
200
if isinstance(contents, changeset.PatchApply):
133
202
if isinstance(contents, changeset.ReplaceContents):
134
203
if contents.old_contents is None and contents.new_contents is None:
161
230
other_path = other.readonly_path(entry.id)
162
231
return PermissionsMerge(base_path, other_path)
164
def get_merge_entry(entry, inventory, base, other, conflict_handler):
165
if entry.contents_change is not None:
166
new_entry.contents_change = changeset.Diff3Merge(base_path, other_path)
167
if entry.metadata_change is not None:
168
new_entry.metadata_change = PermissionsMerge(base_path, other_path)
172
234
class PermissionsMerge(object):
173
235
def __init__(self, base_path, other_path):
369
431
def change_perms_tree(self, id, tree, mode):
370
432
os.chmod(tree.full_path(id), mode)
372
def merge_changeset(self):
434
def merge_changeset(self, merge_factory):
373
435
all_inventory = ThreewayInventory(Inventory(self.this.inventory),
374
436
Inventory(self.base.inventory),
375
437
Inventory(self.other.inventory))
376
438
conflict_handler = changeset.ExceptionConflictHandler(self.this.dir)
377
439
return make_merge_changeset(self.cset, all_inventory, self.this,
378
self.base, self.other, conflict_handler)
440
self.base, self.other, conflict_handler,
380
443
def apply_inv_change(self, inventory_change, orig_inventory):
381
444
orig_inventory_by_path = {}
443
506
builder.change_name("2", base="name4")
444
507
builder.add_file("3", "0", "name5", "hello3", 0755)
445
508
builder.change_name("3", this="name6")
446
cset = builder.merge_changeset()
509
cset = builder.merge_changeset(ApplyMerge3)
447
510
assert(cset.entries["2"].is_boring())
448
511
assert(cset.entries["1"].name == "name1")
449
512
assert(cset.entries["1"].new_name == "name2")
462
525
builder.add_file("1", "0", "name1", "hello1", 0644)
463
526
builder.change_name("1", other="name2", this="name3")
464
527
self.assertRaises(changeset.RenameConflict,
465
builder.merge_changeset)
528
builder.merge_changeset, ApplyMerge3)
466
529
builder.cleanup()
468
531
def test_file_moves(self):
479
542
assert(Inventory(builder.this.inventory).get_parent("4") == "2")
480
543
builder.change_parent("5", base="2")
481
544
assert(Inventory(builder.base.inventory).get_parent("5") == "2")
482
cset = builder.merge_changeset()
545
cset = builder.merge_changeset(ApplyMerge3)
483
546
for id in ("1", "2", "4", "5"):
484
547
assert(cset.entries[id].is_boring())
485
548
assert(cset.entries["3"].parent == "1")
494
557
builder.add_file("4", "1", "file1", "hello1", 0644)
495
558
builder.change_parent("4", other="2", this="3")
496
559
self.assertRaises(changeset.MoveConflict,
497
builder.merge_changeset)
560
builder.merge_changeset, ApplyMerge3)
498
561
builder.cleanup()
500
563
def test_contents_merge(self):
501
"""Test diff3 merging"""
564
"""Test merge3 merging"""
565
self.do_contents_test(ApplyMerge3)
567
def test_contents_merge2(self):
568
"""Test diff3 merging"""
569
self.do_contents_test(changeset.Diff3Merge)
571
def test_contents_merge3(self):
572
"""Test diff3 merging"""
573
def backup_merge(base_file, other_file):
574
return BackupBeforeChange(ApplyMerge3(base_file, other_file))
575
builder = self.contents_test_success(backup_merge)
576
def backup_exists(file_id):
577
return os.path.exists(builder.this.full_path(file_id)+"~")
578
assert backup_exists("1")
579
assert backup_exists("2")
580
assert not backup_exists("3")
583
def do_contents_test(self, merge_factory):
584
"""Test merging with specified ContentsChange factory"""
585
builder = self.contents_test_success(merge_factory)
587
self.contents_test_conflicts(merge_factory)
589
def contents_test_success(self, merge_factory):
590
from inspect import isclass
502
591
builder = MergeBuilder()
503
592
builder.add_file("1", "0", "name1", "text1", 0755)
504
593
builder.change_contents("1", other="text4")
506
595
builder.change_contents("2", base="text5")
507
596
builder.add_file("3", "0", "name5", "text3", 0744)
508
597
builder.change_contents("3", this="text6")
509
cset = builder.merge_changeset()
598
cset = builder.merge_changeset(merge_factory)
510
599
assert(cset.entries["1"].contents_change is not None)
511
assert(isinstance(cset.entries["1"].contents_change,
512
changeset.Diff3Merge))
513
assert(isinstance(cset.entries["2"].contents_change,
514
changeset.Diff3Merge))
600
if isclass(merge_factory):
601
assert(isinstance(cset.entries["1"].contents_change,
603
assert(isinstance(cset.entries["2"].contents_change,
515
605
assert(cset.entries["3"].is_boring())
516
606
builder.apply_changeset(cset)
517
607
assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
519
609
assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
520
610
assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
521
611
assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
614
def contents_test_conflicts(self, merge_factory):
524
615
builder = MergeBuilder()
525
616
builder.add_file("1", "0", "name1", "text1", 0755)
526
617
builder.change_contents("1", other="text4", this="text3")
527
cset = builder.merge_changeset()
618
cset = builder.merge_changeset(merge_factory)
528
619
self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
530
621
builder.cleanup()
534
625
builder.change_contents("1", other="text4", base="text3")
535
626
builder.remove_file("1", base=True)
536
627
self.assertRaises(changeset.NewContentsConflict,
537
builder.merge_changeset)
628
builder.merge_changeset, merge_factory)
538
629
builder.cleanup()
540
631
builder = MergeBuilder()
541
632
builder.add_file("1", "0", "name1", "text1", 0755)
542
633
builder.change_contents("1", other="text4", base="text3")
543
634
builder.remove_file("1", this=True)
544
self.assertRaises(changeset.MissingForMerge, builder.merge_changeset)
635
self.assertRaises(changeset.MissingForMerge, builder.merge_changeset,
545
637
builder.cleanup()
547
639
def test_perms_merge(self):
552
644
builder.change_perms("2", base=0655)
553
645
builder.add_file("3", "0", "name3", "text3", 0755)
554
646
builder.change_perms("3", this=0655)
555
cset = builder.merge_changeset()
647
cset = builder.merge_changeset(ApplyMerge3)
556
648
assert(cset.entries["1"].metadata_change is not None)
557
649
assert(isinstance(cset.entries["1"].metadata_change,
558
650
PermissionsMerge))
567
659
builder = MergeBuilder()
568
660
builder.add_file("1", "0", "name1", "text1", 0755)
569
661
builder.change_perms("1", other=0655, base=0555)
570
cset = builder.merge_changeset()
662
cset = builder.merge_changeset(ApplyMerge3)
571
663
self.assertRaises(changeset.MergePermissionConflict,
572
664
builder.apply_changeset, cset)
573
665
builder.cleanup()