2
from changeset import Inventory, apply_changeset, invert_dict
5
class ThreewayInventory:
6
def __init__(self, this_inventory, base_inventory, other_inventory):
7
self.this = this_inventory
8
self.base = base_inventory
9
self.other = other_inventory
10
def invert_invent(inventory):
12
for key, value in inventory.iteritems():
13
invert_invent[value.id] = key
16
def make_inv(inventory):
17
return Inventory(invert_invent(inventory))
20
def merge_flex(this, base, other, changeset_function, inventory_function,
22
this_inventory = inventory_function(this)
23
base_inventory = inventory_function(base)
24
other_inventory = inventory_function(other)
25
inventory = ThreewayInventory(make_inv(this_inventory),
26
make_inv(base_inventory),
27
make_inv(other_inventory))
28
cset = changeset_function(base, other, base_inventory, other_inventory)
29
new_cset = make_merge_changeset(cset, inventory, this, base, other,
31
return apply_changeset(new_cset, invert_invent(this_inventory), this.root,
32
conflict_handler, False)
36
def make_merge_changeset(cset, inventory, this, base, other,
37
conflict_handler=None):
38
new_cset = changeset.Changeset()
39
def get_this_contents(id):
40
path = os.path.join(this.root, inventory.this.get_path(id))
41
if os.path.isdir(path):
42
return changeset.dir_create
44
return changeset.FileCreate(file(path, "rb").read())
46
for entry in cset.entries.itervalues():
48
new_cset.add_entry(entry)
49
elif entry.is_creation(False):
50
if inventory.this.get_path(entry.id) is None:
51
new_cset.add_entry(entry)
53
this_contents = get_this_contents(entry.id)
54
other_contents = entry.contents_change.new_contents
55
if other_contents == this_contents:
56
boring_entry = changeset.ChangesetEntry(entry.id,
59
new_cset.add_entry(boring_entry)
61
conflict_handler.contents_conflict(this_contents,
64
elif entry.is_deletion(False):
65
if inventory.this.get_path(entry.id) is None:
66
boring_entry = changeset.ChangesetEntry(entry.id, entry.parent,
68
new_cset.add_entry(boring_entry)
69
elif entry.contents_change is not None:
70
this_contents = get_this_contents(entry.id)
71
base_contents = entry.contents_change.old_contents
72
if base_contents == this_contents:
73
new_cset.add_entry(entry)
75
entry_path = inventory.this.get_path(entry.id)
76
conflict_handler.rem_contents_conflict(entry_path,
81
new_cset.add_entry(entry)
83
entry = get_merge_entry(entry, inventory, base, other,
86
new_cset.add_entry(entry)
90
def get_merge_entry(entry, inventory, base, other, conflict_handler):
91
this_name = inventory.this.get_name(entry.id)
92
this_parent = inventory.this.get_parent(entry.id)
93
this_dir = inventory.this.get_dir(entry.id)
97
return conflict_handler.merge_missing(entry.id, inventory)
99
base_name = inventory.base.get_name(entry.id)
100
base_parent = inventory.base.get_parent(entry.id)
101
base_dir = inventory.base.get_dir(entry.id)
104
other_name = inventory.other.get_name(entry.id)
105
other_parent = inventory.other.get_parent(entry.id)
106
other_dir = inventory.base.get_dir(entry.id)
107
if other_dir is None:
110
if base_name == other_name:
114
if this_name != base_name and this_name != other_name:
115
conflict_handler.rename_conflict(entry.id, this_name, base_name,
119
new_name = other_name
121
if base_parent == other_parent:
122
old_parent = this_parent
123
new_parent = this_parent
127
if this_parent != base_parent and this_parent != other_parent:
128
conflict_handler.move_conflict(entry.id, inventory)
130
old_parent = this_parent
132
new_parent = other_parent
134
old_path = os.path.join(old_dir, old_name)
135
new_entry = changeset.ChangesetEntry(entry.id, old_parent, old_name)
136
if new_name is not None or new_parent is not None:
137
new_entry.new_path = os.path.join(new_dir, new_name)
139
new_entry.new_path = None
140
new_entry.new_parent = new_parent
142
base_path = base.readonly_path(entry.id)
143
other_path = other.readonly_path(entry.id)
145
if entry.contents_change is not None:
146
new_entry.contents_change = changeset.Diff3Merge(base_path, other_path)
147
if entry.metadata_change is not None:
148
new_entry.metadata_change = PermissionsMerge(base_path, other_path)
152
class PermissionsMerge:
153
def __init__(self, base_path, other_path):
154
self.base_path = base_path
155
self.other_path = other_path
157
def apply(self, filename, conflict_handler, reverse=False):
159
base = self.base_path
160
other = self.other_path
162
base = self.other_path
163
other = self.base_path
164
base_stat = os.stat(base).st_mode
165
other_stat = os.stat(other).st_mode
166
this_stat = os.stat(filename).st_mode
167
if base_stat &0777 == other_stat &0777:
169
elif this_stat &0777 == other_stat &0777:
171
elif this_stat &0777 == base_stat &0777:
172
os.chmod(filename, other_stat)
174
conflict_handler.permission_conflict(filename, base, other)
181
def __init__(self, dir):
184
self.inventory = {'0': ""}
186
def child_path(self, parent, name):
187
return os.path.join(self.inventory[parent], name)
189
def add_file(self, id, parent, name, contents, mode):
190
path = self.child_path(parent, name)
191
full_path = self.abs_path(path)
192
assert not os.path.exists(full_path)
193
file(full_path, "wb").write(contents)
194
os.chmod(self.abs_path(path), mode)
195
self.inventory[id] = path
197
def add_dir(self, id, parent, name, mode):
198
path = self.child_path(parent, name)
199
full_path = self.abs_path(path)
200
assert not os.path.exists(full_path)
201
os.mkdir(self.abs_path(path))
202
os.chmod(self.abs_path(path), mode)
203
self.inventory[id] = path
205
def abs_path(self, path):
206
return os.path.join(self.dir, path)
208
def full_path(self, id):
209
return self.abs_path(self.inventory[id])
211
def change_path(self, id, path):
212
new = os.path.join(self.dir, self.inventory[id])
213
os.rename(self.abs_path(self.inventory[id]), self.abs_path(path))
214
self.inventory[id] = path
218
self.dir = tempfile.mkdtemp(prefix="BaZing")
219
self.base = MergeTree(os.path.join(self.dir, "base"))
220
self.this = MergeTree(os.path.join(self.dir, "this"))
221
self.other = MergeTree(os.path.join(self.dir, "other"))
223
self.cset = changeset.Changeset()
224
self.cset.add_entry(changeset.ChangesetEntry("0",
225
changeset.NULL_ID, "./."))
226
def get_cset_path(self, parent, name):
228
assert (parent is None)
230
return os.path.join(self.cset.entries[parent].path, name)
232
def add_file(self, id, parent, name, contents, mode):
233
self.base.add_file(id, parent, name, contents, mode)
234
self.this.add_file(id, parent, name, contents, mode)
235
self.other.add_file(id, parent, name, contents, mode)
236
path = self.get_cset_path(parent, name)
237
self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
239
def add_dir(self, id, parent, name, mode):
240
path = self.get_cset_path(parent, name)
241
self.base.add_dir(id, parent, name, mode)
242
self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
243
self.this.add_dir(id, parent, name, mode)
244
self.other.add_dir(id, parent, name, mode)
247
def change_name(self, id, base=None, this=None, other=None):
249
self.change_name_tree(id, self.base, base)
250
self.cset.entries[id].name = base
253
self.change_name_tree(id, self.this, this)
255
if other is not None:
256
self.change_name_tree(id, self.other, other)
257
self.cset.entries[id].new_name = other
259
def change_parent(self, id, base=None, this=None, other=None):
261
self.change_parent_tree(id, self.base, base)
262
self.cset.entries[id].parent = base
263
self.cset.entries[id].dir = self.cset.entries[base].path
266
self.change_parent_tree(id, self.this, this)
268
if other is not None:
269
self.change_parent_tree(id, self.other, other)
270
self.cset.entries[id].new_parent = other
271
self.cset.entries[id].new_dir = \
272
self.cset.entries[other].new_path
274
def change_contents(self, id, base=None, this=None, other=None):
276
self.change_contents_tree(id, self.base, base)
279
self.change_contents_tree(id, self.this, this)
281
if other is not None:
282
self.change_contents_tree(id, self.other, other)
284
if base is not None or other is not None:
285
old_contents = file(self.base.full_path(id)).read()
286
new_contents = file(self.other.full_path(id)).read()
287
contents = changeset.ReplaceFileContents(old_contents,
289
self.cset.entries[id].contents_change = contents
291
def change_perms(self, id, base=None, this=None, other=None):
293
self.change_perms_tree(id, self.base, base)
296
self.change_perms_tree(id, self.this, this)
298
if other is not None:
299
self.change_perms_tree(id, self.other, other)
301
if base is not None or other is not None:
302
old_perms = os.stat(self.base.full_path(id)).st_mode &077
303
new_perms = os.stat(self.other.full_path(id)).st_mode &077
304
contents = changeset.ChangeUnixPermissions(old_perms,
306
self.cset.entries[id].metadata_change = contents
308
def change_name_tree(self, id, tree, name):
309
new_path = tree.child_path(self.cset.entries[id].parent, name)
310
tree.change_path(id, new_path)
312
def change_parent_tree(self, id, tree, parent):
313
new_path = tree.child_path(parent, self.cset.entries[id].name)
314
tree.change_path(id, new_path)
316
def change_contents_tree(self, id, tree, contents):
317
path = tree.full_path(id)
318
mode = os.stat(path).st_mode
319
file(path, "w").write(contents)
322
def change_perms_tree(self, id, tree, mode):
323
os.chmod(tree.full_path(id), mode)
325
def merge_changeset(self):
326
all_inventory = ThreewayInventory(Inventory(self.this.inventory),
327
Inventory(self.base.inventory),
328
Inventory(self.other.inventory))
329
conflict_handler = changeset.ExceptionConflictHandler(self.this.dir)
330
return make_merge_changeset(self.cset, all_inventory, self.this.dir,
331
self.base.dir, self.other.dir,
333
def apply_changeset(self, cset, conflict_handler=None, reverse=False):
334
self.this.inventory = \
335
changeset.apply_changeset(cset, self.this.inventory,
336
self.this.dir, conflict_handler,
340
shutil.rmtree(self.dir)
343
class MergeTest(unittest.TestCase):
344
def test_change_name(self):
346
builder = MergeBuilder()
347
builder.add_file("1", "0", "name1", "hello1", 0755)
348
builder.change_name("1", other="name2")
349
builder.add_file("2", "0", "name3", "hello2", 0755)
350
builder.change_name("2", base="name4")
351
builder.add_file("3", "0", "name5", "hello3", 0755)
352
builder.change_name("3", this="name6")
353
cset = builder.merge_changeset()
354
assert(cset.entries["2"].is_boring())
355
assert(cset.entries["1"].name == "name1")
356
assert(cset.entries["1"].new_name == "name2")
357
assert(cset.entries["3"].is_boring())
358
for tree in (builder.this, builder.other, builder.base):
359
assert(tree.dir != builder.dir and
360
tree.dir.startswith(builder.dir))
361
for path in tree.inventory.itervalues():
362
fullpath = tree.abs_path(path)
363
assert(fullpath.startswith(tree.dir))
364
assert(not path.startswith(tree.dir))
365
assert os.path.exists(fullpath)
366
builder.apply_changeset(cset)
368
builder = MergeBuilder()
369
builder.add_file("1", "0", "name1", "hello1", 0644)
370
builder.change_name("1", other="name2", this="name3")
371
self.assertRaises(changeset.RenameConflict,
372
builder.merge_changeset)
375
def test_file_moves(self):
377
builder = MergeBuilder()
378
builder.add_dir("1", "0", "dir1", 0755)
379
builder.add_dir("2", "0", "dir2", 0755)
380
builder.add_file("3", "1", "file1", "hello1", 0644)
381
builder.add_file("4", "1", "file2", "hello2", 0644)
382
builder.add_file("5", "1", "file3", "hello3", 0644)
383
builder.change_parent("3", other="2")
384
assert(Inventory(builder.other.inventory).get_parent("3") == "2")
385
builder.change_parent("4", this="2")
386
assert(Inventory(builder.this.inventory).get_parent("4") == "2")
387
builder.change_parent("5", base="2")
388
assert(Inventory(builder.base.inventory).get_parent("5") == "2")
389
cset = builder.merge_changeset()
390
for id in ("1", "2", "4", "5"):
391
assert(cset.entries[id].is_boring())
392
assert(cset.entries["3"].parent == "1")
393
assert(cset.entries["3"].new_parent == "2")
394
builder.apply_changeset(cset)
397
builder = MergeBuilder()
398
builder.add_dir("1", "0", "dir1", 0755)
399
builder.add_dir("2", "0", "dir2", 0755)
400
builder.add_dir("3", "0", "dir3", 0755)
401
builder.add_file("4", "1", "file1", "hello1", 0644)
402
builder.change_parent("4", other="2", this="3")
403
self.assertRaises(changeset.MoveConflict,
404
builder.merge_changeset)
407
def test_contents_merge(self):
408
"""Test diff3 merging"""
409
builder = MergeBuilder()
410
builder.add_file("1", "0", "name1", "text1", 0755)
411
builder.change_contents("1", other="text4")
412
builder.add_file("2", "0", "name3", "text2", 0655)
413
builder.change_contents("2", base="text5")
414
builder.add_file("3", "0", "name5", "text3", 0744)
415
builder.change_contents("3", this="text6")
416
cset = builder.merge_changeset()
417
assert(cset.entries["1"].contents_change is not None)
418
assert(isinstance(cset.entries["1"].contents_change,
419
changeset.Diff3Merge))
420
assert(isinstance(cset.entries["2"].contents_change,
421
changeset.Diff3Merge))
422
assert(cset.entries["3"].is_boring())
423
builder.apply_changeset(cset)
424
assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
425
assert(file(builder.this.full_path("2"), "rb").read() == "text2" )
426
assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
427
assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
428
assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
431
builder = MergeBuilder()
432
builder.add_file("1", "0", "name1", "text1", 0755)
433
builder.change_contents("1", other="text4", this="text3")
434
cset = builder.merge_changeset()
435
self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
439
def test_perms_merge(self):
440
builder = MergeBuilder()
441
builder.add_file("1", "0", "name1", "text1", 0755)
442
builder.change_perms("1", other=0655)
443
builder.add_file("2", "0", "name2", "text2", 0755)
444
builder.change_perms("2", base=0655)
445
builder.add_file("3", "0", "name3", "text3", 0755)
446
builder.change_perms("3", this=0655)
447
cset = builder.merge_changeset()
448
assert(cset.entries["1"].metadata_change is not None)
449
assert(isinstance(cset.entries["1"].metadata_change,
451
assert(isinstance(cset.entries["2"].metadata_change,
453
assert(cset.entries["3"].is_boring())
454
builder.apply_changeset(cset)
455
assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0655)
456
assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0755)
457
assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0655)
459
builder = MergeBuilder()
460
builder.add_file("1", "0", "name1", "text1", 0755)
461
builder.change_perms("1", other=0655, base=0555)
462
cset = builder.merge_changeset()
463
self.assertRaises(changeset.MergePermissionConflict,
464
builder.apply_changeset, cset)
468
changeset_suite = unittest.makeSuite(MergeTest, 'test_')
469
runner = unittest.TextTestRunner()
470
runner.run(changeset_suite)
472
if __name__ == "__main__":