277
277
conflict_handler.permission_conflict(filename, base, other)
283
from bzrlib.inventory import InventoryEntry, RootEntry
284
from osutils import file_kind
285
class FalseTree(object):
286
def __init__(self, realtree):
287
self._realtree = realtree
288
self.inventory = self
290
def __getitem__(self, file_id):
291
entry = self.make_inventory_entry(file_id)
293
raise KeyError(file_id)
296
def make_inventory_entry(self, file_id):
297
path = self._realtree.inventory.get(file_id)
301
return RootEntry(file_id)
302
dir, name = os.path.split(path)
303
kind = file_kind(self._realtree.abs_path(path))
304
for parent_id, path in self._realtree.inventory.iteritems():
308
raise Exception("Can't find parent for %s" % name)
309
return InventoryEntry(file_id, name, kind, parent_id)
312
class MergeTree(object):
313
def __init__(self, dir):
316
self.inventory = {'0': ""}
317
self.tree = FalseTree(self)
319
def child_path(self, parent, name):
320
return os.path.join(self.inventory[parent], name)
322
def add_file(self, id, parent, name, contents, mode):
323
path = self.child_path(parent, name)
324
full_path = self.abs_path(path)
325
assert not os.path.exists(full_path)
326
file(full_path, "wb").write(contents)
327
os.chmod(self.abs_path(path), mode)
328
self.inventory[id] = path
330
def remove_file(self, id):
331
os.unlink(self.full_path(id))
332
del self.inventory[id]
334
def add_dir(self, id, parent, name, mode):
335
path = self.child_path(parent, name)
336
full_path = self.abs_path(path)
337
assert not os.path.exists(full_path)
338
os.mkdir(self.abs_path(path))
339
os.chmod(self.abs_path(path), mode)
340
self.inventory[id] = path
342
def abs_path(self, path):
343
return os.path.join(self.dir, path)
345
def full_path(self, id):
347
tree_path = self.inventory[id]
350
return self.abs_path(tree_path)
352
def readonly_path(self, id):
353
return self.full_path(id)
355
def __contains__(self, file_id):
356
return file_id in self.inventory
358
def has_or_had_id(self, file_id):
359
return file_id in self
361
def get_file(self, file_id):
362
path = self.readonly_path(file_id)
363
return file(path, "rb")
365
def id2path(self, file_id):
366
return self.inventory[file_id]
368
def change_path(self, id, path):
369
new = os.path.join(self.dir, self.inventory[id])
370
os.rename(self.abs_path(self.inventory[id]), self.abs_path(path))
371
self.inventory[id] = path
373
class MergeBuilder(object):
375
self.dir = tempfile.mkdtemp(prefix="BaZing")
376
self.base = MergeTree(os.path.join(self.dir, "base"))
377
self.this = MergeTree(os.path.join(self.dir, "this"))
378
self.other = MergeTree(os.path.join(self.dir, "other"))
380
self.cset = changeset.Changeset()
381
self.cset.add_entry(changeset.ChangesetEntry("0",
382
changeset.NULL_ID, "./."))
383
def get_cset_path(self, parent, name):
385
assert (parent is None)
387
return os.path.join(self.cset.entries[parent].path, name)
389
def add_file(self, id, parent, name, contents, mode):
390
self.base.add_file(id, parent, name, contents, mode)
391
self.this.add_file(id, parent, name, contents, mode)
392
self.other.add_file(id, parent, name, contents, mode)
393
path = self.get_cset_path(parent, name)
394
self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
396
def remove_file(self, id, base=False, this=False, other=False):
397
for option, tree in ((base, self.base), (this, self.this),
398
(other, self.other)):
402
change = self.cset.entries[id].contents_change
404
change = changeset.ReplaceContents(None, None)
405
self.cset.entries[id].contents_change = change
406
def create_file(tree):
407
return changeset.FileCreate(tree.get_file(id).read())
409
change.new_contents = create_file(self.other)
411
change.old_contents = create_file(self.base)
413
assert isinstance(change, changeset.ReplaceContents)
415
change.new_contents=None
417
change.old_contents=None
418
if change.old_contents is None and change.new_contents is None:
422
def add_dir(self, id, parent, name, mode):
423
path = self.get_cset_path(parent, name)
424
self.base.add_dir(id, parent, name, mode)
425
self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
426
self.this.add_dir(id, parent, name, mode)
427
self.other.add_dir(id, parent, name, mode)
430
def change_name(self, id, base=None, this=None, other=None):
432
self.change_name_tree(id, self.base, base)
433
self.cset.entries[id].name = base
436
self.change_name_tree(id, self.this, this)
438
if other is not None:
439
self.change_name_tree(id, self.other, other)
440
self.cset.entries[id].new_name = other
442
def change_parent(self, id, base=None, this=None, other=None):
444
self.change_parent_tree(id, self.base, base)
445
self.cset.entries[id].parent = base
446
self.cset.entries[id].dir = self.cset.entries[base].path
449
self.change_parent_tree(id, self.this, this)
451
if other is not None:
452
self.change_parent_tree(id, self.other, other)
453
self.cset.entries[id].new_parent = other
454
self.cset.entries[id].new_dir = \
455
self.cset.entries[other].new_path
457
def change_contents(self, id, base=None, this=None, other=None):
459
self.change_contents_tree(id, self.base, base)
462
self.change_contents_tree(id, self.this, this)
464
if other is not None:
465
self.change_contents_tree(id, self.other, other)
467
if base is not None or other is not None:
468
old_contents = file(self.base.full_path(id)).read()
469
new_contents = file(self.other.full_path(id)).read()
470
contents = changeset.ReplaceFileContents(old_contents,
472
self.cset.entries[id].contents_change = contents
474
def change_perms(self, id, base=None, this=None, other=None):
476
self.change_perms_tree(id, self.base, base)
479
self.change_perms_tree(id, self.this, this)
481
if other is not None:
482
self.change_perms_tree(id, self.other, other)
484
if base is not None or other is not None:
485
old_perms = os.stat(self.base.full_path(id)).st_mode &077
486
new_perms = os.stat(self.other.full_path(id)).st_mode &077
487
contents = changeset.ChangeUnixPermissions(old_perms,
489
self.cset.entries[id].metadata_change = contents
491
def change_name_tree(self, id, tree, name):
492
new_path = tree.child_path(self.cset.entries[id].parent, name)
493
tree.change_path(id, new_path)
495
def change_parent_tree(self, id, tree, parent):
496
new_path = tree.child_path(parent, self.cset.entries[id].name)
497
tree.change_path(id, new_path)
499
def change_contents_tree(self, id, tree, contents):
500
path = tree.full_path(id)
501
mode = os.stat(path).st_mode
502
file(path, "w").write(contents)
505
def change_perms_tree(self, id, tree, mode):
506
os.chmod(tree.full_path(id), mode)
508
def merge_changeset(self, merge_factory):
509
conflict_handler = changeset.ExceptionConflictHandler(self.this.dir)
510
return make_merge_changeset(self.cset, self.this, self.base,
511
self.other, conflict_handler,
514
def apply_inv_change(self, inventory_change, orig_inventory):
515
orig_inventory_by_path = {}
516
for file_id, path in orig_inventory.iteritems():
517
orig_inventory_by_path[path] = file_id
519
def parent_id(file_id):
521
parent_dir = os.path.dirname(orig_inventory[file_id])
527
return orig_inventory_by_path[parent_dir]
529
def new_path(file_id):
530
if inventory_change.has_key(file_id):
531
return inventory_change[file_id]
533
parent = parent_id(file_id)
535
return orig_inventory[file_id]
536
dirname = new_path(parent)
537
return os.path.join(dirname, orig_inventory[file_id])
540
for file_id in orig_inventory.iterkeys():
541
path = new_path(file_id)
544
new_inventory[file_id] = path
546
for file_id, path in inventory_change.iteritems():
547
if orig_inventory.has_key(file_id):
549
new_inventory[file_id] = path
554
def apply_changeset(self, cset, conflict_handler=None, reverse=False):
555
inventory_change = changeset.apply_changeset(cset,
558
conflict_handler, reverse)
559
self.this.inventory = self.apply_inv_change(inventory_change,
567
shutil.rmtree(self.dir)
570
class MergeTest(unittest.TestCase):
571
def test_change_name(self):
573
builder = MergeBuilder()
574
builder.add_file("1", "0", "name1", "hello1", 0755)
575
builder.change_name("1", other="name2")
576
builder.add_file("2", "0", "name3", "hello2", 0755)
577
builder.change_name("2", base="name4")
578
builder.add_file("3", "0", "name5", "hello3", 0755)
579
builder.change_name("3", this="name6")
580
cset = builder.merge_changeset(ApplyMerge3)
581
assert(cset.entries["2"].is_boring())
582
assert(cset.entries["1"].name == "name1")
583
assert(cset.entries["1"].new_name == "name2")
584
assert(cset.entries["3"].is_boring())
585
for tree in (builder.this, builder.other, builder.base):
586
assert(tree.dir != builder.dir and
587
tree.dir.startswith(builder.dir))
588
for path in tree.inventory.itervalues():
589
fullpath = tree.abs_path(path)
590
assert(fullpath.startswith(tree.dir))
591
assert(not path.startswith(tree.dir))
592
assert os.path.exists(fullpath)
593
builder.apply_changeset(cset)
595
builder = MergeBuilder()
596
builder.add_file("1", "0", "name1", "hello1", 0644)
597
builder.change_name("1", other="name2", this="name3")
598
self.assertRaises(changeset.RenameConflict,
599
builder.merge_changeset, ApplyMerge3)
602
def test_file_moves(self):
604
builder = MergeBuilder()
605
builder.add_dir("1", "0", "dir1", 0755)
606
builder.add_dir("2", "0", "dir2", 0755)
607
builder.add_file("3", "1", "file1", "hello1", 0644)
608
builder.add_file("4", "1", "file2", "hello2", 0644)
609
builder.add_file("5", "1", "file3", "hello3", 0644)
610
builder.change_parent("3", other="2")
611
assert(Inventory(builder.other.inventory).get_parent("3") == "2")
612
builder.change_parent("4", this="2")
613
assert(Inventory(builder.this.inventory).get_parent("4") == "2")
614
builder.change_parent("5", base="2")
615
assert(Inventory(builder.base.inventory).get_parent("5") == "2")
616
cset = builder.merge_changeset(ApplyMerge3)
617
for id in ("1", "2", "4", "5"):
618
assert(cset.entries[id].is_boring())
619
assert(cset.entries["3"].parent == "1")
620
assert(cset.entries["3"].new_parent == "2")
621
builder.apply_changeset(cset)
624
builder = MergeBuilder()
625
builder.add_dir("1", "0", "dir1", 0755)
626
builder.add_dir("2", "0", "dir2", 0755)
627
builder.add_dir("3", "0", "dir3", 0755)
628
builder.add_file("4", "1", "file1", "hello1", 0644)
629
builder.change_parent("4", other="2", this="3")
630
self.assertRaises(changeset.MoveConflict,
631
builder.merge_changeset, ApplyMerge3)
634
def test_contents_merge(self):
635
"""Test merge3 merging"""
636
self.do_contents_test(ApplyMerge3)
638
def test_contents_merge2(self):
639
"""Test diff3 merging"""
640
self.do_contents_test(changeset.Diff3Merge)
642
def test_contents_merge3(self):
643
"""Test diff3 merging"""
644
def backup_merge(file_id, base, other):
645
return BackupBeforeChange(ApplyMerge3(file_id, base, other))
646
builder = self.contents_test_success(backup_merge)
647
def backup_exists(file_id):
648
return os.path.exists(builder.this.full_path(file_id)+"~")
649
assert backup_exists("1")
650
assert backup_exists("2")
651
assert not backup_exists("3")
654
def do_contents_test(self, merge_factory):
655
"""Test merging with specified ContentsChange factory"""
656
builder = self.contents_test_success(merge_factory)
658
self.contents_test_conflicts(merge_factory)
660
def contents_test_success(self, merge_factory):
661
from inspect import isclass
662
builder = MergeBuilder()
663
builder.add_file("1", "0", "name1", "text1", 0755)
664
builder.change_contents("1", other="text4")
665
builder.add_file("2", "0", "name3", "text2", 0655)
666
builder.change_contents("2", base="text5")
667
builder.add_file("3", "0", "name5", "text3", 0744)
668
builder.add_file("4", "0", "name6", "text4", 0744)
669
builder.remove_file("4", base=True)
670
assert not builder.cset.entries["4"].is_boring()
671
builder.change_contents("3", this="text6")
672
cset = builder.merge_changeset(merge_factory)
673
assert(cset.entries["1"].contents_change is not None)
674
if isclass(merge_factory):
675
assert(isinstance(cset.entries["1"].contents_change,
677
assert(isinstance(cset.entries["2"].contents_change,
679
assert(cset.entries["3"].is_boring())
680
assert(cset.entries["4"].is_boring())
681
builder.apply_changeset(cset)
682
assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
683
assert(file(builder.this.full_path("2"), "rb").read() == "text2" )
684
assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
685
assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
686
assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
689
def contents_test_conflicts(self, merge_factory):
690
builder = MergeBuilder()
691
builder.add_file("1", "0", "name1", "text1", 0755)
692
builder.change_contents("1", other="text4", this="text3")
693
cset = builder.merge_changeset(merge_factory)
694
self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
698
builder = MergeBuilder()
699
builder.add_file("1", "0", "name1", "text1", 0755)
700
builder.change_contents("1", other="text4", base="text3")
701
builder.remove_file("1", base=True)
702
self.assertRaises(changeset.NewContentsConflict,
703
builder.merge_changeset, merge_factory)
706
builder = MergeBuilder()
707
builder.add_file("1", "0", "name1", "text1", 0755)
708
builder.change_contents("1", other="text4", base="text3")
709
builder.remove_file("1", this=True)
710
self.assertRaises(changeset.MissingForMerge, builder.merge_changeset,
714
def test_perms_merge(self):
715
builder = MergeBuilder()
716
builder.add_file("1", "0", "name1", "text1", 0755)
717
builder.change_perms("1", other=0655)
718
builder.add_file("2", "0", "name2", "text2", 0755)
719
builder.change_perms("2", base=0655)
720
builder.add_file("3", "0", "name3", "text3", 0755)
721
builder.change_perms("3", this=0655)
722
cset = builder.merge_changeset(ApplyMerge3)
723
assert(cset.entries["1"].metadata_change is not None)
724
assert(isinstance(cset.entries["1"].metadata_change,
726
assert(isinstance(cset.entries["2"].metadata_change,
728
assert(cset.entries["3"].is_boring())
729
builder.apply_changeset(cset)
730
assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0655)
731
assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0755)
732
assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0655)
734
builder = MergeBuilder()
735
builder.add_file("1", "0", "name1", "text1", 0755)
736
builder.change_perms("1", other=0655, base=0555)
737
cset = builder.merge_changeset(ApplyMerge3)
738
self.assertRaises(changeset.MergePermissionConflict,
739
builder.apply_changeset, cset)
743
changeset_suite = unittest.makeSuite(MergeTest, 'test_')
744
runner = unittest.TextTestRunner()
745
runner.run(changeset_suite)
747
if __name__ == "__main__":