4
from bzrlib.selftest import InTempDir, TestCase
5
from bzrlib.branch import ScratchBranch, Branch
6
from bzrlib.errors import NotBranchError, NotVersionedError
12
from bzrlib.inventory import InventoryEntry, RootEntry
13
from bzrlib.osutils import file_kind
14
from bzrlib import changeset
15
from bzrlib.merge_core import (ApplyMerge3, make_merge_changeset,
16
BackupBeforeChange, PermissionsMerge)
17
from bzrlib.changeset import Inventory, apply_changeset, invert_dict
19
class FalseTree(object):
20
def __init__(self, realtree):
21
self._realtree = realtree
24
def __getitem__(self, file_id):
25
entry = self.make_inventory_entry(file_id)
27
raise KeyError(file_id)
30
def make_inventory_entry(self, file_id):
31
path = self._realtree.inventory.get(file_id)
35
return RootEntry(file_id)
36
dir, name = os.path.split(path)
37
kind = file_kind(self._realtree.abs_path(path))
38
for parent_id, path in self._realtree.inventory.iteritems():
42
raise Exception("Can't find parent for %s" % name)
43
return InventoryEntry(file_id, name, kind, parent_id)
46
class MergeTree(object):
47
def __init__(self, dir):
50
self.inventory = {'0': ""}
51
self.tree = FalseTree(self)
53
def child_path(self, parent, name):
54
return os.path.join(self.inventory[parent], name)
56
def add_file(self, id, parent, name, contents, mode):
57
path = self.child_path(parent, name)
58
full_path = self.abs_path(path)
59
assert not os.path.exists(full_path)
60
file(full_path, "wb").write(contents)
61
os.chmod(self.abs_path(path), mode)
62
self.inventory[id] = path
64
def remove_file(self, id):
65
os.unlink(self.full_path(id))
66
del self.inventory[id]
68
def add_dir(self, id, parent, name, mode):
69
path = self.child_path(parent, name)
70
full_path = self.abs_path(path)
71
assert not os.path.exists(full_path)
72
os.mkdir(self.abs_path(path))
73
os.chmod(self.abs_path(path), mode)
74
self.inventory[id] = path
76
def abs_path(self, path):
77
return os.path.join(self.dir, path)
79
def full_path(self, id):
81
tree_path = self.inventory[id]
84
return self.abs_path(tree_path)
86
def readonly_path(self, id):
87
return self.full_path(id)
89
def __contains__(self, file_id):
90
return file_id in self.inventory
92
def has_or_had_id(self, file_id):
93
return file_id in self
95
def get_file(self, file_id):
96
path = self.readonly_path(file_id)
97
return file(path, "rb")
99
def id2path(self, file_id):
100
return self.inventory[file_id]
102
def change_path(self, id, path):
103
new = os.path.join(self.dir, self.inventory[id])
104
os.rename(self.abs_path(self.inventory[id]), self.abs_path(path))
105
self.inventory[id] = path
108
class MergeBuilder(object):
110
self.dir = tempfile.mkdtemp(prefix="BaZing")
111
self.base = MergeTree(os.path.join(self.dir, "base"))
112
self.this = MergeTree(os.path.join(self.dir, "this"))
113
self.other = MergeTree(os.path.join(self.dir, "other"))
115
self.cset = changeset.Changeset()
116
self.cset.add_entry(changeset.ChangesetEntry("0",
117
changeset.NULL_ID, "./."))
118
def get_cset_path(self, parent, name):
120
assert (parent is None)
122
return os.path.join(self.cset.entries[parent].path, name)
124
def add_file(self, id, parent, name, contents, mode):
125
self.base.add_file(id, parent, name, contents, mode)
126
self.this.add_file(id, parent, name, contents, mode)
127
self.other.add_file(id, parent, name, contents, mode)
128
path = self.get_cset_path(parent, name)
129
self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
131
def remove_file(self, id, base=False, this=False, other=False):
132
for option, tree in ((base, self.base), (this, self.this),
133
(other, self.other)):
137
change = self.cset.entries[id].contents_change
139
change = changeset.ReplaceContents(None, None)
140
self.cset.entries[id].contents_change = change
141
def create_file(tree):
142
return changeset.FileCreate(tree.get_file(id).read())
144
change.new_contents = create_file(self.other)
146
change.old_contents = create_file(self.base)
148
assert isinstance(change, changeset.ReplaceContents)
150
change.new_contents=None
152
change.old_contents=None
153
if change.old_contents is None and change.new_contents is None:
157
def add_dir(self, id, parent, name, mode):
158
path = self.get_cset_path(parent, name)
159
self.base.add_dir(id, parent, name, mode)
160
self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
161
self.this.add_dir(id, parent, name, mode)
162
self.other.add_dir(id, parent, name, mode)
165
def change_name(self, id, base=None, this=None, other=None):
167
self.change_name_tree(id, self.base, base)
168
self.cset.entries[id].name = base
171
self.change_name_tree(id, self.this, this)
173
if other is not None:
174
self.change_name_tree(id, self.other, other)
175
self.cset.entries[id].new_name = other
177
def change_parent(self, id, base=None, this=None, other=None):
179
self.change_parent_tree(id, self.base, base)
180
self.cset.entries[id].parent = base
181
self.cset.entries[id].dir = self.cset.entries[base].path
184
self.change_parent_tree(id, self.this, this)
186
if other is not None:
187
self.change_parent_tree(id, self.other, other)
188
self.cset.entries[id].new_parent = other
189
self.cset.entries[id].new_dir = \
190
self.cset.entries[other].new_path
192
def change_contents(self, id, base=None, this=None, other=None):
194
self.change_contents_tree(id, self.base, base)
197
self.change_contents_tree(id, self.this, this)
199
if other is not None:
200
self.change_contents_tree(id, self.other, other)
202
if base is not None or other is not None:
203
old_contents = file(self.base.full_path(id)).read()
204
new_contents = file(self.other.full_path(id)).read()
205
contents = changeset.ReplaceFileContents(old_contents,
207
self.cset.entries[id].contents_change = contents
209
def change_perms(self, id, base=None, this=None, other=None):
211
self.change_perms_tree(id, self.base, base)
214
self.change_perms_tree(id, self.this, this)
216
if other is not None:
217
self.change_perms_tree(id, self.other, other)
219
if base is not None or other is not None:
220
old_perms = os.stat(self.base.full_path(id)).st_mode &077
221
new_perms = os.stat(self.other.full_path(id)).st_mode &077
222
contents = changeset.ChangeUnixPermissions(old_perms,
224
self.cset.entries[id].metadata_change = contents
226
def change_name_tree(self, id, tree, name):
227
new_path = tree.child_path(self.cset.entries[id].parent, name)
228
tree.change_path(id, new_path)
230
def change_parent_tree(self, id, tree, parent):
231
new_path = tree.child_path(parent, self.cset.entries[id].name)
232
tree.change_path(id, new_path)
234
def change_contents_tree(self, id, tree, contents):
235
path = tree.full_path(id)
236
mode = os.stat(path).st_mode
237
file(path, "w").write(contents)
240
def change_perms_tree(self, id, tree, mode):
241
os.chmod(tree.full_path(id), mode)
243
def merge_changeset(self, merge_factory):
244
conflict_handler = changeset.ExceptionConflictHandler(self.this.dir)
245
return make_merge_changeset(self.cset, self.this, self.base,
246
self.other, conflict_handler,
249
def apply_inv_change(self, inventory_change, orig_inventory):
250
orig_inventory_by_path = {}
251
for file_id, path in orig_inventory.iteritems():
252
orig_inventory_by_path[path] = file_id
254
def parent_id(file_id):
256
parent_dir = os.path.dirname(orig_inventory[file_id])
262
return orig_inventory_by_path[parent_dir]
264
def new_path(file_id):
265
if inventory_change.has_key(file_id):
266
return inventory_change[file_id]
268
parent = parent_id(file_id)
270
return orig_inventory[file_id]
271
dirname = new_path(parent)
272
return os.path.join(dirname, orig_inventory[file_id])
275
for file_id in orig_inventory.iterkeys():
276
path = new_path(file_id)
279
new_inventory[file_id] = path
281
for file_id, path in inventory_change.iteritems():
282
if orig_inventory.has_key(file_id):
284
new_inventory[file_id] = path
287
def apply_changeset(self, cset, conflict_handler=None, reverse=False):
288
inventory_change = changeset.apply_changeset(cset,
291
conflict_handler, reverse)
292
self.this.inventory = self.apply_inv_change(inventory_change,
296
shutil.rmtree(self.dir)
298
class MergeTest(unittest.TestCase):
299
def test_change_name(self):
301
builder = MergeBuilder()
302
builder.add_file("1", "0", "name1", "hello1", 0755)
303
builder.change_name("1", other="name2")
304
builder.add_file("2", "0", "name3", "hello2", 0755)
305
builder.change_name("2", base="name4")
306
builder.add_file("3", "0", "name5", "hello3", 0755)
307
builder.change_name("3", this="name6")
308
cset = builder.merge_changeset(ApplyMerge3)
309
assert(cset.entries["2"].is_boring())
310
assert(cset.entries["1"].name == "name1")
311
assert(cset.entries["1"].new_name == "name2")
312
assert(cset.entries["3"].is_boring())
313
for tree in (builder.this, builder.other, builder.base):
314
assert(tree.dir != builder.dir and
315
tree.dir.startswith(builder.dir))
316
for path in tree.inventory.itervalues():
317
fullpath = tree.abs_path(path)
318
assert(fullpath.startswith(tree.dir))
319
assert(not path.startswith(tree.dir))
320
assert os.path.exists(fullpath)
321
builder.apply_changeset(cset)
323
builder = MergeBuilder()
324
builder.add_file("1", "0", "name1", "hello1", 0644)
325
builder.change_name("1", other="name2", this="name3")
326
self.assertRaises(changeset.RenameConflict,
327
builder.merge_changeset, ApplyMerge3)
330
def test_file_moves(self):
332
builder = MergeBuilder()
333
builder.add_dir("1", "0", "dir1", 0755)
334
builder.add_dir("2", "0", "dir2", 0755)
335
builder.add_file("3", "1", "file1", "hello1", 0644)
336
builder.add_file("4", "1", "file2", "hello2", 0644)
337
builder.add_file("5", "1", "file3", "hello3", 0644)
338
builder.change_parent("3", other="2")
339
assert(Inventory(builder.other.inventory).get_parent("3") == "2")
340
builder.change_parent("4", this="2")
341
assert(Inventory(builder.this.inventory).get_parent("4") == "2")
342
builder.change_parent("5", base="2")
343
assert(Inventory(builder.base.inventory).get_parent("5") == "2")
344
cset = builder.merge_changeset(ApplyMerge3)
345
for id in ("1", "2", "4", "5"):
346
assert(cset.entries[id].is_boring())
347
assert(cset.entries["3"].parent == "1")
348
assert(cset.entries["3"].new_parent == "2")
349
builder.apply_changeset(cset)
352
builder = MergeBuilder()
353
builder.add_dir("1", "0", "dir1", 0755)
354
builder.add_dir("2", "0", "dir2", 0755)
355
builder.add_dir("3", "0", "dir3", 0755)
356
builder.add_file("4", "1", "file1", "hello1", 0644)
357
builder.change_parent("4", other="2", this="3")
358
self.assertRaises(changeset.MoveConflict,
359
builder.merge_changeset, ApplyMerge3)
362
def test_contents_merge(self):
363
"""Test merge3 merging"""
364
self.do_contents_test(ApplyMerge3)
366
def test_contents_merge2(self):
367
"""Test diff3 merging"""
368
self.do_contents_test(changeset.Diff3Merge)
370
def test_contents_merge3(self):
371
"""Test diff3 merging"""
372
def backup_merge(file_id, base, other):
373
return BackupBeforeChange(ApplyMerge3(file_id, base, other))
374
builder = self.contents_test_success(backup_merge)
375
def backup_exists(file_id):
376
return os.path.exists(builder.this.full_path(file_id)+"~")
377
assert backup_exists("1")
378
assert backup_exists("2")
379
assert not backup_exists("3")
382
def do_contents_test(self, merge_factory):
383
"""Test merging with specified ContentsChange factory"""
384
builder = self.contents_test_success(merge_factory)
386
self.contents_test_conflicts(merge_factory)
388
def contents_test_success(self, merge_factory):
389
from inspect import isclass
390
builder = MergeBuilder()
391
builder.add_file("1", "0", "name1", "text1", 0755)
392
builder.change_contents("1", other="text4")
393
builder.add_file("2", "0", "name3", "text2", 0655)
394
builder.change_contents("2", base="text5")
395
builder.add_file("3", "0", "name5", "text3", 0744)
396
builder.add_file("4", "0", "name6", "text4", 0744)
397
builder.remove_file("4", base=True)
398
assert not builder.cset.entries["4"].is_boring()
399
builder.change_contents("3", this="text6")
400
cset = builder.merge_changeset(merge_factory)
401
assert(cset.entries["1"].contents_change is not None)
402
if isclass(merge_factory):
403
assert(isinstance(cset.entries["1"].contents_change,
405
assert(isinstance(cset.entries["2"].contents_change,
407
assert(cset.entries["3"].is_boring())
408
assert(cset.entries["4"].is_boring())
409
builder.apply_changeset(cset)
410
assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
411
assert(file(builder.this.full_path("2"), "rb").read() == "text2" )
412
assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
413
assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
414
assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
417
def contents_test_conflicts(self, merge_factory):
418
builder = MergeBuilder()
419
builder.add_file("1", "0", "name1", "text1", 0755)
420
builder.change_contents("1", other="text4", this="text3")
421
cset = builder.merge_changeset(merge_factory)
422
self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
426
builder = MergeBuilder()
427
builder.add_file("1", "0", "name1", "text1", 0755)
428
builder.change_contents("1", other="text4", base="text3")
429
builder.remove_file("1", base=True)
430
self.assertRaises(changeset.NewContentsConflict,
431
builder.merge_changeset, merge_factory)
434
builder = MergeBuilder()
435
builder.add_file("1", "0", "name1", "text1", 0755)
436
builder.change_contents("1", other="text4", base="text3")
437
builder.remove_file("1", this=True)
438
self.assertRaises(changeset.MissingForMerge, builder.merge_changeset,
442
def test_perms_merge(self):
443
builder = MergeBuilder()
444
builder.add_file("1", "0", "name1", "text1", 0755)
445
builder.change_perms("1", other=0655)
446
builder.add_file("2", "0", "name2", "text2", 0755)
447
builder.change_perms("2", base=0655)
448
builder.add_file("3", "0", "name3", "text3", 0755)
449
builder.change_perms("3", this=0655)
450
cset = builder.merge_changeset(ApplyMerge3)
451
assert(cset.entries["1"].metadata_change is not None)
452
assert(isinstance(cset.entries["1"].metadata_change,
454
assert(isinstance(cset.entries["2"].metadata_change,
456
assert(cset.entries["3"].is_boring())
457
builder.apply_changeset(cset)
458
assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0655)
459
assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0755)
460
assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0655)
462
builder = MergeBuilder()
463
builder.add_file("1", "0", "name1", "text1", 0755)
464
builder.change_perms("1", other=0655, base=0555)
465
cset = builder.merge_changeset(ApplyMerge3)
466
self.assertRaises(changeset.MergePermissionConflict,
467
builder.apply_changeset, cset)