~bzr-pqm/bzr/bzr.dev

493 by Martin Pool
- Merge aaron's merge command
1
import changeset
2
from changeset import Inventory, apply_changeset, invert_dict
3
import os.path
4
558 by Martin Pool
- All top-level classes inherit from object
5
class ThreewayInventory(object):
493 by Martin Pool
- Merge aaron's merge command
6
    def __init__(self, this_inventory, base_inventory, other_inventory):
7
        self.this = this_inventory
8
        self.base = base_inventory
9
        self.other = other_inventory
10
def invert_invent(inventory):
11
    invert_invent = {}
12
    for key, value in inventory.iteritems():
13
        invert_invent[value.id] = key
14
    return invert_invent
15
16
def make_inv(inventory):
17
    return Inventory(invert_invent(inventory))
18
        
19
20
def merge_flex(this, base, other, changeset_function, inventory_function,
21
               conflict_handler):
22
    this_inventory = inventory_function(this)
23
    base_inventory = inventory_function(base)
24
    other_inventory = inventory_function(other)
25
    inventory = ThreewayInventory(make_inv(this_inventory),
26
                                  make_inv(base_inventory), 
27
                                  make_inv(other_inventory))
28
    cset = changeset_function(base, other, base_inventory, other_inventory)
29
    new_cset = make_merge_changeset(cset, inventory, this, base, other, 
30
                                    conflict_handler)
622 by Martin Pool
Updated merge patch from Aaron
31
    result = apply_changeset(new_cset, invert_invent(this_inventory),
32
                             this.root, conflict_handler, False)
33
    conflict_handler.finalize()
34
    return result
493 by Martin Pool
- Merge aaron's merge command
35
36
    
37
38
def make_merge_changeset(cset, inventory, this, base, other, 
39
                         conflict_handler=None):
40
    new_cset = changeset.Changeset()
41
    def get_this_contents(id):
42
        path = os.path.join(this.root, inventory.this.get_path(id))
43
        if os.path.isdir(path):
44
            return changeset.dir_create
45
        else:
46
            return changeset.FileCreate(file(path, "rb").read())
47
48
    for entry in cset.entries.itervalues():
49
        if entry.is_boring():
50
            new_cset.add_entry(entry)
51
        elif entry.is_creation(False):
52
            if inventory.this.get_path(entry.id) is None:
53
                new_cset.add_entry(entry)
54
            else:
55
                this_contents = get_this_contents(entry.id)
56
                other_contents = entry.contents_change.new_contents
57
                if other_contents == this_contents:
58
                    boring_entry = changeset.ChangesetEntry(entry.id, 
59
                                                            entry.new_parent, 
60
                                                            entry.new_path)
61
                    new_cset.add_entry(boring_entry)
62
                else:
63
                    conflict_handler.contents_conflict(this_contents, 
64
                                                       other_contents)
65
66
        elif entry.is_deletion(False):
67
            if inventory.this.get_path(entry.id) is None:
68
                boring_entry = changeset.ChangesetEntry(entry.id, entry.parent, 
69
                                                        entry.path)
70
                new_cset.add_entry(boring_entry)
71
            elif entry.contents_change is not None:
72
                this_contents = get_this_contents(entry.id) 
73
                base_contents = entry.contents_change.old_contents
74
                if base_contents == this_contents:
75
                    new_cset.add_entry(entry)
76
                else:
77
                    entry_path = inventory.this.get_path(entry.id)
78
                    conflict_handler.rem_contents_conflict(entry_path,
79
                                                           this_contents, 
80
                                                           base_contents)
81
82
            else:
83
                new_cset.add_entry(entry)
84
        else:
85
            entry = get_merge_entry(entry, inventory, base, other, 
86
                                    conflict_handler)
87
            if entry is not None:
88
                new_cset.add_entry(entry)
89
    return new_cset
90
91
92
def get_merge_entry(entry, inventory, base, other, conflict_handler):
93
    this_name = inventory.this.get_name(entry.id)
94
    this_parent = inventory.this.get_parent(entry.id)
95
    this_dir = inventory.this.get_dir(entry.id)
96
    if this_dir is None:
97
        this_dir = ""
98
    if this_name is None:
99
        return conflict_handler.merge_missing(entry.id, inventory)
100
101
    base_name = inventory.base.get_name(entry.id)
102
    base_parent = inventory.base.get_parent(entry.id)
103
    base_dir = inventory.base.get_dir(entry.id)
104
    if base_dir is None:
105
        base_dir = ""
106
    other_name = inventory.other.get_name(entry.id)
107
    other_parent = inventory.other.get_parent(entry.id)
108
    other_dir = inventory.base.get_dir(entry.id)
109
    if other_dir is None:
110
        other_dir = ""
111
112
    if base_name == other_name:
113
        old_name = this_name
114
        new_name = this_name
115
    else:
116
        if this_name != base_name and this_name != other_name:
117
            conflict_handler.rename_conflict(entry.id, this_name, base_name,
118
                                             other_name)
119
        else:
120
            old_name = this_name
121
            new_name = other_name
122
123
    if base_parent == other_parent:
124
        old_parent = this_parent
125
        new_parent = this_parent
126
        old_dir = this_dir
127
        new_dir = this_dir
128
    else:
129
        if this_parent != base_parent and this_parent != other_parent:
130
            conflict_handler.move_conflict(entry.id, inventory)
131
        else:
132
            old_parent = this_parent
133
            old_dir = this_dir
134
            new_parent = other_parent
135
            new_dir = other_dir
136
    old_path = os.path.join(old_dir, old_name)
137
    new_entry = changeset.ChangesetEntry(entry.id, old_parent, old_name)
138
    if new_name is not None or new_parent is not None:
139
        new_entry.new_path = os.path.join(new_dir, new_name)
140
    else:
141
        new_entry.new_path = None
142
    new_entry.new_parent = new_parent
143
144
    base_path = base.readonly_path(entry.id)
145
    other_path = other.readonly_path(entry.id)
146
    
147
    if entry.contents_change is not None:
148
        new_entry.contents_change = changeset.Diff3Merge(base_path, other_path)
149
    if entry.metadata_change is not None:
150
        new_entry.metadata_change = PermissionsMerge(base_path, other_path)
151
152
    return new_entry
153
558 by Martin Pool
- All top-level classes inherit from object
154
class PermissionsMerge(object):
493 by Martin Pool
- Merge aaron's merge command
155
    def __init__(self, base_path, other_path):
156
        self.base_path = base_path
157
        self.other_path = other_path
158
159
    def apply(self, filename, conflict_handler, reverse=False):
160
        if not reverse:
161
            base = self.base_path
162
            other = self.other_path
163
        else:
164
            base = self.other_path
165
            other = self.base_path
166
        base_stat = os.stat(base).st_mode
167
        other_stat = os.stat(other).st_mode
168
        this_stat = os.stat(filename).st_mode
169
        if base_stat &0777 == other_stat &0777:
170
            return
171
        elif this_stat &0777 == other_stat &0777:
172
            return
173
        elif this_stat &0777 == base_stat &0777:
174
            os.chmod(filename, other_stat)
175
        else:
176
            conflict_handler.permission_conflict(filename, base, other)
177
178
179
import unittest
180
import tempfile
181
import shutil
558 by Martin Pool
- All top-level classes inherit from object
182
class MergeTree(object):
493 by Martin Pool
- Merge aaron's merge command
183
    def __init__(self, dir):
184
        self.dir = dir;
185
        os.mkdir(dir)
186
        self.inventory = {'0': ""}
187
    
188
    def child_path(self, parent, name):
189
        return os.path.join(self.inventory[parent], name)
190
191
    def add_file(self, id, parent, name, contents, mode):
192
        path = self.child_path(parent, name)
193
        full_path = self.abs_path(path)
194
        assert not os.path.exists(full_path)
195
        file(full_path, "wb").write(contents)
196
        os.chmod(self.abs_path(path), mode)
197
        self.inventory[id] = path
198
199
    def add_dir(self, id, parent, name, mode):
200
        path = self.child_path(parent, name)
201
        full_path = self.abs_path(path)
202
        assert not os.path.exists(full_path)
203
        os.mkdir(self.abs_path(path))
204
        os.chmod(self.abs_path(path), mode)
205
        self.inventory[id] = path
206
207
    def abs_path(self, path):
208
        return os.path.join(self.dir, path)
209
210
    def full_path(self, id):
211
        return self.abs_path(self.inventory[id])
212
213
    def change_path(self, id, path):
214
        new = os.path.join(self.dir, self.inventory[id])
215
        os.rename(self.abs_path(self.inventory[id]), self.abs_path(path))
216
        self.inventory[id] = path
217
558 by Martin Pool
- All top-level classes inherit from object
218
class MergeBuilder(object):
493 by Martin Pool
- Merge aaron's merge command
219
    def __init__(self):
220
        self.dir = tempfile.mkdtemp(prefix="BaZing")
221
        self.base = MergeTree(os.path.join(self.dir, "base"))
222
        self.this = MergeTree(os.path.join(self.dir, "this"))
223
        self.other = MergeTree(os.path.join(self.dir, "other"))
224
        
225
        self.cset = changeset.Changeset()
226
        self.cset.add_entry(changeset.ChangesetEntry("0", 
227
                                                     changeset.NULL_ID, "./."))
228
    def get_cset_path(self, parent, name):
229
        if name is None:
230
            assert (parent is None)
231
            return None
232
        return os.path.join(self.cset.entries[parent].path, name)
233
234
    def add_file(self, id, parent, name, contents, mode):
235
        self.base.add_file(id, parent, name, contents, mode)
236
        self.this.add_file(id, parent, name, contents, mode)
237
        self.other.add_file(id, parent, name, contents, mode)
238
        path = self.get_cset_path(parent, name)
239
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
240
241
    def add_dir(self, id, parent, name, mode):
242
        path = self.get_cset_path(parent, name)
243
        self.base.add_dir(id, parent, name, mode)
244
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
245
        self.this.add_dir(id, parent, name, mode)
246
        self.other.add_dir(id, parent, name, mode)
247
248
249
    def change_name(self, id, base=None, this=None, other=None):
250
        if base is not None:
251
            self.change_name_tree(id, self.base, base)
252
            self.cset.entries[id].name = base
253
254
        if this is not None:
255
            self.change_name_tree(id, self.this, this)
256
257
        if other is not None:
258
            self.change_name_tree(id, self.other, other)
259
            self.cset.entries[id].new_name = other
260
261
    def change_parent(self, id, base=None, this=None, other=None):
262
        if base is not None:
263
            self.change_parent_tree(id, self.base, base)
264
            self.cset.entries[id].parent = base
265
            self.cset.entries[id].dir = self.cset.entries[base].path
266
267
        if this is not None:
268
            self.change_parent_tree(id, self.this, this)
269
270
        if other is not None:
271
            self.change_parent_tree(id, self.other, other)
272
            self.cset.entries[id].new_parent = other
273
            self.cset.entries[id].new_dir = \
274
                self.cset.entries[other].new_path
275
276
    def change_contents(self, id, base=None, this=None, other=None):
277
        if base is not None:
278
            self.change_contents_tree(id, self.base, base)
279
280
        if this is not None:
281
            self.change_contents_tree(id, self.this, this)
282
283
        if other is not None:
284
            self.change_contents_tree(id, self.other, other)
285
286
        if base is not None or other is not None:
287
            old_contents = file(self.base.full_path(id)).read()
288
            new_contents = file(self.other.full_path(id)).read()
289
            contents = changeset.ReplaceFileContents(old_contents, 
290
                                                     new_contents)
291
            self.cset.entries[id].contents_change = contents
292
293
    def change_perms(self, id, base=None, this=None, other=None):
294
        if base is not None:
295
            self.change_perms_tree(id, self.base, base)
296
297
        if this is not None:
298
            self.change_perms_tree(id, self.this, this)
299
300
        if other is not None:
301
            self.change_perms_tree(id, self.other, other)
302
303
        if base is not None or other is not None:
304
            old_perms = os.stat(self.base.full_path(id)).st_mode &077
305
            new_perms = os.stat(self.other.full_path(id)).st_mode &077
306
            contents = changeset.ChangeUnixPermissions(old_perms, 
307
                                                       new_perms)
308
            self.cset.entries[id].metadata_change = contents
309
310
    def change_name_tree(self, id, tree, name):
311
        new_path = tree.child_path(self.cset.entries[id].parent, name)
312
        tree.change_path(id, new_path)
313
314
    def change_parent_tree(self, id, tree, parent):
315
        new_path = tree.child_path(parent, self.cset.entries[id].name)
316
        tree.change_path(id, new_path)
317
318
    def change_contents_tree(self, id, tree, contents):
319
        path = tree.full_path(id)
320
        mode = os.stat(path).st_mode
321
        file(path, "w").write(contents)
322
        os.chmod(path, mode)
323
324
    def change_perms_tree(self, id, tree, mode):
325
        os.chmod(tree.full_path(id), mode)
326
327
    def merge_changeset(self):
328
        all_inventory = ThreewayInventory(Inventory(self.this.inventory),
329
                                          Inventory(self.base.inventory), 
330
                                          Inventory(self.other.inventory))
331
        conflict_handler = changeset.ExceptionConflictHandler(self.this.dir)
332
        return make_merge_changeset(self.cset, all_inventory, self.this.dir,
333
                                    self.base.dir, self.other.dir, 
334
                                    conflict_handler)
335
    def apply_changeset(self, cset, conflict_handler=None, reverse=False):
336
        self.this.inventory = \
337
            changeset.apply_changeset(cset, self.this.inventory,
338
                                      self.this.dir, conflict_handler,
339
                                      reverse)
340
        
341
    def cleanup(self):
342
        shutil.rmtree(self.dir)
343
344
345
class MergeTest(unittest.TestCase):
346
    def test_change_name(self):
347
        """Test renames"""
348
        builder = MergeBuilder()
349
        builder.add_file("1", "0", "name1", "hello1", 0755)
350
        builder.change_name("1", other="name2")
351
        builder.add_file("2", "0", "name3", "hello2", 0755)
352
        builder.change_name("2", base="name4")
353
        builder.add_file("3", "0", "name5", "hello3", 0755)
354
        builder.change_name("3", this="name6")
355
        cset = builder.merge_changeset()
356
        assert(cset.entries["2"].is_boring())
357
        assert(cset.entries["1"].name == "name1")
358
        assert(cset.entries["1"].new_name == "name2")
359
        assert(cset.entries["3"].is_boring())
360
        for tree in (builder.this, builder.other, builder.base):
361
            assert(tree.dir != builder.dir and 
362
                   tree.dir.startswith(builder.dir))
363
            for path in tree.inventory.itervalues():
364
                fullpath = tree.abs_path(path)
365
                assert(fullpath.startswith(tree.dir))
366
                assert(not path.startswith(tree.dir))
367
                assert os.path.exists(fullpath)
368
        builder.apply_changeset(cset)
369
        builder.cleanup()
370
        builder = MergeBuilder()
371
        builder.add_file("1", "0", "name1", "hello1", 0644)
372
        builder.change_name("1", other="name2", this="name3")
373
        self.assertRaises(changeset.RenameConflict, 
374
                          builder.merge_changeset)
375
        builder.cleanup()
376
        
377
    def test_file_moves(self):
378
        """Test moves"""
379
        builder = MergeBuilder()
380
        builder.add_dir("1", "0", "dir1", 0755)
381
        builder.add_dir("2", "0", "dir2", 0755)
382
        builder.add_file("3", "1", "file1", "hello1", 0644)
383
        builder.add_file("4", "1", "file2", "hello2", 0644)
384
        builder.add_file("5", "1", "file3", "hello3", 0644)
385
        builder.change_parent("3", other="2")
386
        assert(Inventory(builder.other.inventory).get_parent("3") == "2")
387
        builder.change_parent("4", this="2")
388
        assert(Inventory(builder.this.inventory).get_parent("4") == "2")
389
        builder.change_parent("5", base="2")
390
        assert(Inventory(builder.base.inventory).get_parent("5") == "2")
391
        cset = builder.merge_changeset()
392
        for id in ("1", "2", "4", "5"):
393
            assert(cset.entries[id].is_boring())
394
        assert(cset.entries["3"].parent == "1")
395
        assert(cset.entries["3"].new_parent == "2")
396
        builder.apply_changeset(cset)
397
        builder.cleanup()
398
399
        builder = MergeBuilder()
400
        builder.add_dir("1", "0", "dir1", 0755)
401
        builder.add_dir("2", "0", "dir2", 0755)
402
        builder.add_dir("3", "0", "dir3", 0755)
403
        builder.add_file("4", "1", "file1", "hello1", 0644)
404
        builder.change_parent("4", other="2", this="3")
405
        self.assertRaises(changeset.MoveConflict, 
406
                          builder.merge_changeset)
407
        builder.cleanup()
408
409
    def test_contents_merge(self):
410
        """Test diff3 merging"""
411
        builder = MergeBuilder()
412
        builder.add_file("1", "0", "name1", "text1", 0755)
413
        builder.change_contents("1", other="text4")
414
        builder.add_file("2", "0", "name3", "text2", 0655)
415
        builder.change_contents("2", base="text5")
416
        builder.add_file("3", "0", "name5", "text3", 0744)
417
        builder.change_contents("3", this="text6")
418
        cset = builder.merge_changeset()
419
        assert(cset.entries["1"].contents_change is not None)
420
        assert(isinstance(cset.entries["1"].contents_change,
421
                          changeset.Diff3Merge))
422
        assert(isinstance(cset.entries["2"].contents_change,
423
                          changeset.Diff3Merge))
424
        assert(cset.entries["3"].is_boring())
425
        builder.apply_changeset(cset)
426
        assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
427
        assert(file(builder.this.full_path("2"), "rb").read() == "text2" )
428
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
429
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
430
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
431
        builder.cleanup()
432
433
        builder = MergeBuilder()
434
        builder.add_file("1", "0", "name1", "text1", 0755)
435
        builder.change_contents("1", other="text4", this="text3")
436
        cset = builder.merge_changeset()
437
        self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
438
                          cset)
439
        builder.cleanup()
440
441
    def test_perms_merge(self):
442
        builder = MergeBuilder()
443
        builder.add_file("1", "0", "name1", "text1", 0755)
444
        builder.change_perms("1", other=0655)
445
        builder.add_file("2", "0", "name2", "text2", 0755)
446
        builder.change_perms("2", base=0655)
447
        builder.add_file("3", "0", "name3", "text3", 0755)
448
        builder.change_perms("3", this=0655)
449
        cset = builder.merge_changeset()
450
        assert(cset.entries["1"].metadata_change is not None)
451
        assert(isinstance(cset.entries["1"].metadata_change,
452
                          PermissionsMerge))
453
        assert(isinstance(cset.entries["2"].metadata_change,
454
                          PermissionsMerge))
455
        assert(cset.entries["3"].is_boring())
456
        builder.apply_changeset(cset)
457
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0655)
458
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0755)
459
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0655)
460
        builder.cleanup();
461
        builder = MergeBuilder()
462
        builder.add_file("1", "0", "name1", "text1", 0755)
463
        builder.change_perms("1", other=0655, base=0555)
464
        cset = builder.merge_changeset()
465
        self.assertRaises(changeset.MergePermissionConflict, 
466
                     builder.apply_changeset, cset)
467
        builder.cleanup()
468
469
def test():        
470
    changeset_suite = unittest.makeSuite(MergeTest, 'test_')
471
    runner = unittest.TextTestRunner()
472
    runner.run(changeset_suite)
473
        
474
if __name__ == "__main__":
475
    test()