~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/merge_core.py

  • Committer: Martin Pool
  • Date: 2005-05-15 02:36:04 UTC
  • Revision ID: mbp@sourcefrog.net-20050515023603-7328f6cbabd2b09a
- Merge aaron's merge command

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import changeset
 
2
from changeset import Inventory, apply_changeset, invert_dict
 
3
import os.path
 
4
 
 
5
class ThreewayInventory:
 
6
    def __init__(self, this_inventory, base_inventory, other_inventory):
 
7
        self.this = this_inventory
 
8
        self.base = base_inventory
 
9
        self.other = other_inventory
 
10
def invert_invent(inventory):
 
11
    invert_invent = {}
 
12
    for key, value in inventory.iteritems():
 
13
        invert_invent[value.id] = key
 
14
    return invert_invent
 
15
 
 
16
def make_inv(inventory):
 
17
    return Inventory(invert_invent(inventory))
 
18
        
 
19
 
 
20
def merge_flex(this, base, other, changeset_function, inventory_function,
 
21
               conflict_handler):
 
22
    this_inventory = inventory_function(this)
 
23
    base_inventory = inventory_function(base)
 
24
    other_inventory = inventory_function(other)
 
25
    inventory = ThreewayInventory(make_inv(this_inventory),
 
26
                                  make_inv(base_inventory), 
 
27
                                  make_inv(other_inventory))
 
28
    cset = changeset_function(base, other, base_inventory, other_inventory)
 
29
    new_cset = make_merge_changeset(cset, inventory, this, base, other, 
 
30
                                    conflict_handler)
 
31
    return apply_changeset(new_cset, invert_invent(this_inventory), this.root, 
 
32
                           conflict_handler, False)
 
33
 
 
34
    
 
35
 
 
36
def make_merge_changeset(cset, inventory, this, base, other, 
 
37
                         conflict_handler=None):
 
38
    new_cset = changeset.Changeset()
 
39
    def get_this_contents(id):
 
40
        path = os.path.join(this.root, inventory.this.get_path(id))
 
41
        if os.path.isdir(path):
 
42
            return changeset.dir_create
 
43
        else:
 
44
            return changeset.FileCreate(file(path, "rb").read())
 
45
 
 
46
    for entry in cset.entries.itervalues():
 
47
        if entry.is_boring():
 
48
            new_cset.add_entry(entry)
 
49
        elif entry.is_creation(False):
 
50
            if inventory.this.get_path(entry.id) is None:
 
51
                new_cset.add_entry(entry)
 
52
            else:
 
53
                this_contents = get_this_contents(entry.id)
 
54
                other_contents = entry.contents_change.new_contents
 
55
                if other_contents == this_contents:
 
56
                    boring_entry = changeset.ChangesetEntry(entry.id, 
 
57
                                                            entry.new_parent, 
 
58
                                                            entry.new_path)
 
59
                    new_cset.add_entry(boring_entry)
 
60
                else:
 
61
                    conflict_handler.contents_conflict(this_contents, 
 
62
                                                       other_contents)
 
63
 
 
64
        elif entry.is_deletion(False):
 
65
            if inventory.this.get_path(entry.id) is None:
 
66
                boring_entry = changeset.ChangesetEntry(entry.id, entry.parent, 
 
67
                                                        entry.path)
 
68
                new_cset.add_entry(boring_entry)
 
69
            elif entry.contents_change is not None:
 
70
                this_contents = get_this_contents(entry.id) 
 
71
                base_contents = entry.contents_change.old_contents
 
72
                if base_contents == this_contents:
 
73
                    new_cset.add_entry(entry)
 
74
                else:
 
75
                    entry_path = inventory.this.get_path(entry.id)
 
76
                    conflict_handler.rem_contents_conflict(entry_path,
 
77
                                                           this_contents, 
 
78
                                                           base_contents)
 
79
 
 
80
            else:
 
81
                new_cset.add_entry(entry)
 
82
        else:
 
83
            entry = get_merge_entry(entry, inventory, base, other, 
 
84
                                    conflict_handler)
 
85
            if entry is not None:
 
86
                new_cset.add_entry(entry)
 
87
    return new_cset
 
88
 
 
89
 
 
90
def get_merge_entry(entry, inventory, base, other, conflict_handler):
 
91
    this_name = inventory.this.get_name(entry.id)
 
92
    this_parent = inventory.this.get_parent(entry.id)
 
93
    this_dir = inventory.this.get_dir(entry.id)
 
94
    if this_dir is None:
 
95
        this_dir = ""
 
96
    if this_name is None:
 
97
        return conflict_handler.merge_missing(entry.id, inventory)
 
98
 
 
99
    base_name = inventory.base.get_name(entry.id)
 
100
    base_parent = inventory.base.get_parent(entry.id)
 
101
    base_dir = inventory.base.get_dir(entry.id)
 
102
    if base_dir is None:
 
103
        base_dir = ""
 
104
    other_name = inventory.other.get_name(entry.id)
 
105
    other_parent = inventory.other.get_parent(entry.id)
 
106
    other_dir = inventory.base.get_dir(entry.id)
 
107
    if other_dir is None:
 
108
        other_dir = ""
 
109
 
 
110
    if base_name == other_name:
 
111
        old_name = this_name
 
112
        new_name = this_name
 
113
    else:
 
114
        if this_name != base_name and this_name != other_name:
 
115
            conflict_handler.rename_conflict(entry.id, this_name, base_name,
 
116
                                             other_name)
 
117
        else:
 
118
            old_name = this_name
 
119
            new_name = other_name
 
120
 
 
121
    if base_parent == other_parent:
 
122
        old_parent = this_parent
 
123
        new_parent = this_parent
 
124
        old_dir = this_dir
 
125
        new_dir = this_dir
 
126
    else:
 
127
        if this_parent != base_parent and this_parent != other_parent:
 
128
            conflict_handler.move_conflict(entry.id, inventory)
 
129
        else:
 
130
            old_parent = this_parent
 
131
            old_dir = this_dir
 
132
            new_parent = other_parent
 
133
            new_dir = other_dir
 
134
    old_path = os.path.join(old_dir, old_name)
 
135
    new_entry = changeset.ChangesetEntry(entry.id, old_parent, old_name)
 
136
    if new_name is not None or new_parent is not None:
 
137
        new_entry.new_path = os.path.join(new_dir, new_name)
 
138
    else:
 
139
        new_entry.new_path = None
 
140
    new_entry.new_parent = new_parent
 
141
 
 
142
    base_path = base.readonly_path(entry.id)
 
143
    other_path = other.readonly_path(entry.id)
 
144
    
 
145
    if entry.contents_change is not None:
 
146
        new_entry.contents_change = changeset.Diff3Merge(base_path, other_path)
 
147
    if entry.metadata_change is not None:
 
148
        new_entry.metadata_change = PermissionsMerge(base_path, other_path)
 
149
 
 
150
    return new_entry
 
151
 
 
152
class PermissionsMerge:
 
153
    def __init__(self, base_path, other_path):
 
154
        self.base_path = base_path
 
155
        self.other_path = other_path
 
156
 
 
157
    def apply(self, filename, conflict_handler, reverse=False):
 
158
        if not reverse:
 
159
            base = self.base_path
 
160
            other = self.other_path
 
161
        else:
 
162
            base = self.other_path
 
163
            other = self.base_path
 
164
        base_stat = os.stat(base).st_mode
 
165
        other_stat = os.stat(other).st_mode
 
166
        this_stat = os.stat(filename).st_mode
 
167
        if base_stat &0777 == other_stat &0777:
 
168
            return
 
169
        elif this_stat &0777 == other_stat &0777:
 
170
            return
 
171
        elif this_stat &0777 == base_stat &0777:
 
172
            os.chmod(filename, other_stat)
 
173
        else:
 
174
            conflict_handler.permission_conflict(filename, base, other)
 
175
 
 
176
 
 
177
import unittest
 
178
import tempfile
 
179
import shutil
 
180
class MergeTree:
 
181
    def __init__(self, dir):
 
182
        self.dir = dir;
 
183
        os.mkdir(dir)
 
184
        self.inventory = {'0': ""}
 
185
    
 
186
    def child_path(self, parent, name):
 
187
        return os.path.join(self.inventory[parent], name)
 
188
 
 
189
    def add_file(self, id, parent, name, contents, mode):
 
190
        path = self.child_path(parent, name)
 
191
        full_path = self.abs_path(path)
 
192
        assert not os.path.exists(full_path)
 
193
        file(full_path, "wb").write(contents)
 
194
        os.chmod(self.abs_path(path), mode)
 
195
        self.inventory[id] = path
 
196
 
 
197
    def add_dir(self, id, parent, name, mode):
 
198
        path = self.child_path(parent, name)
 
199
        full_path = self.abs_path(path)
 
200
        assert not os.path.exists(full_path)
 
201
        os.mkdir(self.abs_path(path))
 
202
        os.chmod(self.abs_path(path), mode)
 
203
        self.inventory[id] = path
 
204
 
 
205
    def abs_path(self, path):
 
206
        return os.path.join(self.dir, path)
 
207
 
 
208
    def full_path(self, id):
 
209
        return self.abs_path(self.inventory[id])
 
210
 
 
211
    def change_path(self, id, path):
 
212
        new = os.path.join(self.dir, self.inventory[id])
 
213
        os.rename(self.abs_path(self.inventory[id]), self.abs_path(path))
 
214
        self.inventory[id] = path
 
215
 
 
216
class MergeBuilder:
 
217
    def __init__(self):
 
218
        self.dir = tempfile.mkdtemp(prefix="BaZing")
 
219
        self.base = MergeTree(os.path.join(self.dir, "base"))
 
220
        self.this = MergeTree(os.path.join(self.dir, "this"))
 
221
        self.other = MergeTree(os.path.join(self.dir, "other"))
 
222
        
 
223
        self.cset = changeset.Changeset()
 
224
        self.cset.add_entry(changeset.ChangesetEntry("0", 
 
225
                                                     changeset.NULL_ID, "./."))
 
226
    def get_cset_path(self, parent, name):
 
227
        if name is None:
 
228
            assert (parent is None)
 
229
            return None
 
230
        return os.path.join(self.cset.entries[parent].path, name)
 
231
 
 
232
    def add_file(self, id, parent, name, contents, mode):
 
233
        self.base.add_file(id, parent, name, contents, mode)
 
234
        self.this.add_file(id, parent, name, contents, mode)
 
235
        self.other.add_file(id, parent, name, contents, mode)
 
236
        path = self.get_cset_path(parent, name)
 
237
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
 
238
 
 
239
    def add_dir(self, id, parent, name, mode):
 
240
        path = self.get_cset_path(parent, name)
 
241
        self.base.add_dir(id, parent, name, mode)
 
242
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
 
243
        self.this.add_dir(id, parent, name, mode)
 
244
        self.other.add_dir(id, parent, name, mode)
 
245
 
 
246
 
 
247
    def change_name(self, id, base=None, this=None, other=None):
 
248
        if base is not None:
 
249
            self.change_name_tree(id, self.base, base)
 
250
            self.cset.entries[id].name = base
 
251
 
 
252
        if this is not None:
 
253
            self.change_name_tree(id, self.this, this)
 
254
 
 
255
        if other is not None:
 
256
            self.change_name_tree(id, self.other, other)
 
257
            self.cset.entries[id].new_name = other
 
258
 
 
259
    def change_parent(self, id, base=None, this=None, other=None):
 
260
        if base is not None:
 
261
            self.change_parent_tree(id, self.base, base)
 
262
            self.cset.entries[id].parent = base
 
263
            self.cset.entries[id].dir = self.cset.entries[base].path
 
264
 
 
265
        if this is not None:
 
266
            self.change_parent_tree(id, self.this, this)
 
267
 
 
268
        if other is not None:
 
269
            self.change_parent_tree(id, self.other, other)
 
270
            self.cset.entries[id].new_parent = other
 
271
            self.cset.entries[id].new_dir = \
 
272
                self.cset.entries[other].new_path
 
273
 
 
274
    def change_contents(self, id, base=None, this=None, other=None):
 
275
        if base is not None:
 
276
            self.change_contents_tree(id, self.base, base)
 
277
 
 
278
        if this is not None:
 
279
            self.change_contents_tree(id, self.this, this)
 
280
 
 
281
        if other is not None:
 
282
            self.change_contents_tree(id, self.other, other)
 
283
 
 
284
        if base is not None or other is not None:
 
285
            old_contents = file(self.base.full_path(id)).read()
 
286
            new_contents = file(self.other.full_path(id)).read()
 
287
            contents = changeset.ReplaceFileContents(old_contents, 
 
288
                                                     new_contents)
 
289
            self.cset.entries[id].contents_change = contents
 
290
 
 
291
    def change_perms(self, id, base=None, this=None, other=None):
 
292
        if base is not None:
 
293
            self.change_perms_tree(id, self.base, base)
 
294
 
 
295
        if this is not None:
 
296
            self.change_perms_tree(id, self.this, this)
 
297
 
 
298
        if other is not None:
 
299
            self.change_perms_tree(id, self.other, other)
 
300
 
 
301
        if base is not None or other is not None:
 
302
            old_perms = os.stat(self.base.full_path(id)).st_mode &077
 
303
            new_perms = os.stat(self.other.full_path(id)).st_mode &077
 
304
            contents = changeset.ChangeUnixPermissions(old_perms, 
 
305
                                                       new_perms)
 
306
            self.cset.entries[id].metadata_change = contents
 
307
 
 
308
    def change_name_tree(self, id, tree, name):
 
309
        new_path = tree.child_path(self.cset.entries[id].parent, name)
 
310
        tree.change_path(id, new_path)
 
311
 
 
312
    def change_parent_tree(self, id, tree, parent):
 
313
        new_path = tree.child_path(parent, self.cset.entries[id].name)
 
314
        tree.change_path(id, new_path)
 
315
 
 
316
    def change_contents_tree(self, id, tree, contents):
 
317
        path = tree.full_path(id)
 
318
        mode = os.stat(path).st_mode
 
319
        file(path, "w").write(contents)
 
320
        os.chmod(path, mode)
 
321
 
 
322
    def change_perms_tree(self, id, tree, mode):
 
323
        os.chmod(tree.full_path(id), mode)
 
324
 
 
325
    def merge_changeset(self):
 
326
        all_inventory = ThreewayInventory(Inventory(self.this.inventory),
 
327
                                          Inventory(self.base.inventory), 
 
328
                                          Inventory(self.other.inventory))
 
329
        conflict_handler = changeset.ExceptionConflictHandler(self.this.dir)
 
330
        return make_merge_changeset(self.cset, all_inventory, self.this.dir,
 
331
                                    self.base.dir, self.other.dir, 
 
332
                                    conflict_handler)
 
333
    def apply_changeset(self, cset, conflict_handler=None, reverse=False):
 
334
        self.this.inventory = \
 
335
            changeset.apply_changeset(cset, self.this.inventory,
 
336
                                      self.this.dir, conflict_handler,
 
337
                                      reverse)
 
338
        
 
339
    def cleanup(self):
 
340
        shutil.rmtree(self.dir)
 
341
 
 
342
 
 
343
class MergeTest(unittest.TestCase):
 
344
    def test_change_name(self):
 
345
        """Test renames"""
 
346
        builder = MergeBuilder()
 
347
        builder.add_file("1", "0", "name1", "hello1", 0755)
 
348
        builder.change_name("1", other="name2")
 
349
        builder.add_file("2", "0", "name3", "hello2", 0755)
 
350
        builder.change_name("2", base="name4")
 
351
        builder.add_file("3", "0", "name5", "hello3", 0755)
 
352
        builder.change_name("3", this="name6")
 
353
        cset = builder.merge_changeset()
 
354
        assert(cset.entries["2"].is_boring())
 
355
        assert(cset.entries["1"].name == "name1")
 
356
        assert(cset.entries["1"].new_name == "name2")
 
357
        assert(cset.entries["3"].is_boring())
 
358
        for tree in (builder.this, builder.other, builder.base):
 
359
            assert(tree.dir != builder.dir and 
 
360
                   tree.dir.startswith(builder.dir))
 
361
            for path in tree.inventory.itervalues():
 
362
                fullpath = tree.abs_path(path)
 
363
                assert(fullpath.startswith(tree.dir))
 
364
                assert(not path.startswith(tree.dir))
 
365
                assert os.path.exists(fullpath)
 
366
        builder.apply_changeset(cset)
 
367
        builder.cleanup()
 
368
        builder = MergeBuilder()
 
369
        builder.add_file("1", "0", "name1", "hello1", 0644)
 
370
        builder.change_name("1", other="name2", this="name3")
 
371
        self.assertRaises(changeset.RenameConflict, 
 
372
                          builder.merge_changeset)
 
373
        builder.cleanup()
 
374
        
 
375
    def test_file_moves(self):
 
376
        """Test moves"""
 
377
        builder = MergeBuilder()
 
378
        builder.add_dir("1", "0", "dir1", 0755)
 
379
        builder.add_dir("2", "0", "dir2", 0755)
 
380
        builder.add_file("3", "1", "file1", "hello1", 0644)
 
381
        builder.add_file("4", "1", "file2", "hello2", 0644)
 
382
        builder.add_file("5", "1", "file3", "hello3", 0644)
 
383
        builder.change_parent("3", other="2")
 
384
        assert(Inventory(builder.other.inventory).get_parent("3") == "2")
 
385
        builder.change_parent("4", this="2")
 
386
        assert(Inventory(builder.this.inventory).get_parent("4") == "2")
 
387
        builder.change_parent("5", base="2")
 
388
        assert(Inventory(builder.base.inventory).get_parent("5") == "2")
 
389
        cset = builder.merge_changeset()
 
390
        for id in ("1", "2", "4", "5"):
 
391
            assert(cset.entries[id].is_boring())
 
392
        assert(cset.entries["3"].parent == "1")
 
393
        assert(cset.entries["3"].new_parent == "2")
 
394
        builder.apply_changeset(cset)
 
395
        builder.cleanup()
 
396
 
 
397
        builder = MergeBuilder()
 
398
        builder.add_dir("1", "0", "dir1", 0755)
 
399
        builder.add_dir("2", "0", "dir2", 0755)
 
400
        builder.add_dir("3", "0", "dir3", 0755)
 
401
        builder.add_file("4", "1", "file1", "hello1", 0644)
 
402
        builder.change_parent("4", other="2", this="3")
 
403
        self.assertRaises(changeset.MoveConflict, 
 
404
                          builder.merge_changeset)
 
405
        builder.cleanup()
 
406
 
 
407
    def test_contents_merge(self):
 
408
        """Test diff3 merging"""
 
409
        builder = MergeBuilder()
 
410
        builder.add_file("1", "0", "name1", "text1", 0755)
 
411
        builder.change_contents("1", other="text4")
 
412
        builder.add_file("2", "0", "name3", "text2", 0655)
 
413
        builder.change_contents("2", base="text5")
 
414
        builder.add_file("3", "0", "name5", "text3", 0744)
 
415
        builder.change_contents("3", this="text6")
 
416
        cset = builder.merge_changeset()
 
417
        assert(cset.entries["1"].contents_change is not None)
 
418
        assert(isinstance(cset.entries["1"].contents_change,
 
419
                          changeset.Diff3Merge))
 
420
        assert(isinstance(cset.entries["2"].contents_change,
 
421
                          changeset.Diff3Merge))
 
422
        assert(cset.entries["3"].is_boring())
 
423
        builder.apply_changeset(cset)
 
424
        assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
 
425
        assert(file(builder.this.full_path("2"), "rb").read() == "text2" )
 
426
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
 
427
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
 
428
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
 
429
        builder.cleanup()
 
430
 
 
431
        builder = MergeBuilder()
 
432
        builder.add_file("1", "0", "name1", "text1", 0755)
 
433
        builder.change_contents("1", other="text4", this="text3")
 
434
        cset = builder.merge_changeset()
 
435
        self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
 
436
                          cset)
 
437
        builder.cleanup()
 
438
 
 
439
    def test_perms_merge(self):
 
440
        builder = MergeBuilder()
 
441
        builder.add_file("1", "0", "name1", "text1", 0755)
 
442
        builder.change_perms("1", other=0655)
 
443
        builder.add_file("2", "0", "name2", "text2", 0755)
 
444
        builder.change_perms("2", base=0655)
 
445
        builder.add_file("3", "0", "name3", "text3", 0755)
 
446
        builder.change_perms("3", this=0655)
 
447
        cset = builder.merge_changeset()
 
448
        assert(cset.entries["1"].metadata_change is not None)
 
449
        assert(isinstance(cset.entries["1"].metadata_change,
 
450
                          PermissionsMerge))
 
451
        assert(isinstance(cset.entries["2"].metadata_change,
 
452
                          PermissionsMerge))
 
453
        assert(cset.entries["3"].is_boring())
 
454
        builder.apply_changeset(cset)
 
455
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0655)
 
456
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0755)
 
457
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0655)
 
458
        builder.cleanup();
 
459
        builder = MergeBuilder()
 
460
        builder.add_file("1", "0", "name1", "text1", 0755)
 
461
        builder.change_perms("1", other=0655, base=0555)
 
462
        cset = builder.merge_changeset()
 
463
        self.assertRaises(changeset.MergePermissionConflict, 
 
464
                     builder.apply_changeset, cset)
 
465
        builder.cleanup()
 
466
 
 
467
def test():        
 
468
    changeset_suite = unittest.makeSuite(MergeTest, 'test_')
 
469
    runner = unittest.TextTestRunner()
 
470
    runner.run(changeset_suite)
 
471
        
 
472
if __name__ == "__main__":
 
473
    test()