~bzr-pqm/bzr/bzr.dev

1092.1.24 by Robert Collins
move merge_core tests into the selftest package. Also reduce double-run of those tests
1
import os
2
import unittest
3
1141 by Martin Pool
- rename FunctionalTest to TestCaseInTempDir
4
from bzrlib.selftest import TestCaseInTempDir, TestCase
1092.1.24 by Robert Collins
move merge_core tests into the selftest package. Also reduce double-run of those tests
5
from bzrlib.branch import ScratchBranch, Branch
6
from bzrlib.errors import NotBranchError, NotVersionedError
7
8
9
import tempfile
10
import shutil
11
from bzrlib.inventory import InventoryEntry, RootEntry
12
from bzrlib.osutils import file_kind
13
from bzrlib import changeset
14
from bzrlib.merge_core import (ApplyMerge3, make_merge_changeset,
15
                                BackupBeforeChange, PermissionsMerge)
16
from bzrlib.changeset import Inventory, apply_changeset, invert_dict
17
18
class FalseTree(object):
19
    def __init__(self, realtree):
20
        self._realtree = realtree
21
        self.inventory = self
22
23
    def __getitem__(self, file_id):
24
        entry = self.make_inventory_entry(file_id)
25
        if entry is None:
26
            raise KeyError(file_id)
27
        return entry
28
        
29
    def make_inventory_entry(self, file_id):
30
        path = self._realtree.inventory.get(file_id)
31
        if path is None:
32
            return None
33
        if path == "":
34
            return RootEntry(file_id)
35
        dir, name = os.path.split(path)
36
        kind = file_kind(self._realtree.abs_path(path))
37
        for parent_id, path in self._realtree.inventory.iteritems():
38
            if path == dir:
39
                break
40
        if path != dir:
41
            raise Exception("Can't find parent for %s" % name)
42
        return InventoryEntry(file_id, name, kind, parent_id)
43
44
45
class MergeTree(object):
46
    def __init__(self, dir):
47
        self.dir = dir;
48
        os.mkdir(dir)
49
        self.inventory = {'0': ""}
50
        self.tree = FalseTree(self)
51
    
52
    def child_path(self, parent, name):
53
        return os.path.join(self.inventory[parent], name)
54
55
    def add_file(self, id, parent, name, contents, mode):
56
        path = self.child_path(parent, name)
57
        full_path = self.abs_path(path)
58
        assert not os.path.exists(full_path)
59
        file(full_path, "wb").write(contents)
60
        os.chmod(self.abs_path(path), mode)
61
        self.inventory[id] = path
62
63
    def remove_file(self, id):
64
        os.unlink(self.full_path(id))
65
        del self.inventory[id]
66
67
    def add_dir(self, id, parent, name, mode):
68
        path = self.child_path(parent, name)
69
        full_path = self.abs_path(path)
70
        assert not os.path.exists(full_path)
71
        os.mkdir(self.abs_path(path))
72
        os.chmod(self.abs_path(path), mode)
73
        self.inventory[id] = path
74
75
    def abs_path(self, path):
76
        return os.path.join(self.dir, path)
77
78
    def full_path(self, id):
79
        try:
80
            tree_path = self.inventory[id]
81
        except KeyError:
82
            return None
83
        return self.abs_path(tree_path)
84
85
    def readonly_path(self, id):
86
        return self.full_path(id)
87
88
    def __contains__(self, file_id):
89
        return file_id in self.inventory
90
91
    def has_or_had_id(self, file_id):
92
        return file_id in self
93
94
    def get_file(self, file_id):
95
        path = self.readonly_path(file_id)
96
        return file(path, "rb")
97
98
    def id2path(self, file_id):
99
        return self.inventory[file_id]
100
101
    def change_path(self, id, path):
1143 by Martin Pool
- remove dead code and remove some small errors (pychecker)
102
        old_path = os.path.join(self.dir, self.inventory[id])
103
        os.rename(old_path, self.abs_path(path))
1092.1.24 by Robert Collins
move merge_core tests into the selftest package. Also reduce double-run of those tests
104
        self.inventory[id] = path
105
106
107
class MergeBuilder(object):
108
    def __init__(self):
109
        self.dir = tempfile.mkdtemp(prefix="BaZing")
110
        self.base = MergeTree(os.path.join(self.dir, "base"))
111
        self.this = MergeTree(os.path.join(self.dir, "this"))
112
        self.other = MergeTree(os.path.join(self.dir, "other"))
113
        
114
        self.cset = changeset.Changeset()
115
        self.cset.add_entry(changeset.ChangesetEntry("0", 
116
                                                     changeset.NULL_ID, "./."))
117
    def get_cset_path(self, parent, name):
118
        if name is None:
119
            assert (parent is None)
120
            return None
121
        return os.path.join(self.cset.entries[parent].path, name)
122
123
    def add_file(self, id, parent, name, contents, mode):
124
        self.base.add_file(id, parent, name, contents, mode)
125
        self.this.add_file(id, parent, name, contents, mode)
126
        self.other.add_file(id, parent, name, contents, mode)
127
        path = self.get_cset_path(parent, name)
128
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
129
130
    def remove_file(self, id, base=False, this=False, other=False):
131
        for option, tree in ((base, self.base), (this, self.this), 
132
                             (other, self.other)):
133
            if option:
134
                tree.remove_file(id)
135
            if other or base:
136
                change = self.cset.entries[id].contents_change
137
                if change is None:
138
                    change = changeset.ReplaceContents(None, None)
139
                    self.cset.entries[id].contents_change = change
140
                    def create_file(tree):
141
                        return changeset.FileCreate(tree.get_file(id).read())
142
                    if not other:
143
                        change.new_contents = create_file(self.other)
144
                    if not base:
145
                        change.old_contents = create_file(self.base)
146
                else:
147
                    assert isinstance(change, changeset.ReplaceContents)
148
                if other:
149
                    change.new_contents=None
150
                if base:
151
                    change.old_contents=None
152
                if change.old_contents is None and change.new_contents is None:
153
                    change = None
154
155
156
    def add_dir(self, id, parent, name, mode):
157
        path = self.get_cset_path(parent, name)
158
        self.base.add_dir(id, parent, name, mode)
159
        self.cset.add_entry(changeset.ChangesetEntry(id, parent, path))
160
        self.this.add_dir(id, parent, name, mode)
161
        self.other.add_dir(id, parent, name, mode)
162
163
164
    def change_name(self, id, base=None, this=None, other=None):
165
        if base is not None:
166
            self.change_name_tree(id, self.base, base)
167
            self.cset.entries[id].name = base
168
169
        if this is not None:
170
            self.change_name_tree(id, self.this, this)
171
172
        if other is not None:
173
            self.change_name_tree(id, self.other, other)
174
            self.cset.entries[id].new_name = other
175
176
    def change_parent(self, id, base=None, this=None, other=None):
177
        if base is not None:
178
            self.change_parent_tree(id, self.base, base)
179
            self.cset.entries[id].parent = base
180
            self.cset.entries[id].dir = self.cset.entries[base].path
181
182
        if this is not None:
183
            self.change_parent_tree(id, self.this, this)
184
185
        if other is not None:
186
            self.change_parent_tree(id, self.other, other)
187
            self.cset.entries[id].new_parent = other
188
            self.cset.entries[id].new_dir = \
189
                self.cset.entries[other].new_path
190
191
    def change_contents(self, id, base=None, this=None, other=None):
192
        if base is not None:
193
            self.change_contents_tree(id, self.base, base)
194
195
        if this is not None:
196
            self.change_contents_tree(id, self.this, this)
197
198
        if other is not None:
199
            self.change_contents_tree(id, self.other, other)
200
201
        if base is not None or other is not None:
202
            old_contents = file(self.base.full_path(id)).read()
203
            new_contents = file(self.other.full_path(id)).read()
204
            contents = changeset.ReplaceFileContents(old_contents, 
205
                                                     new_contents)
206
            self.cset.entries[id].contents_change = contents
207
208
    def change_perms(self, id, base=None, this=None, other=None):
209
        if base is not None:
210
            self.change_perms_tree(id, self.base, base)
211
212
        if this is not None:
213
            self.change_perms_tree(id, self.this, this)
214
215
        if other is not None:
216
            self.change_perms_tree(id, self.other, other)
217
218
        if base is not None or other is not None:
219
            old_perms = os.stat(self.base.full_path(id)).st_mode &077
220
            new_perms = os.stat(self.other.full_path(id)).st_mode &077
221
            contents = changeset.ChangeUnixPermissions(old_perms, 
222
                                                       new_perms)
223
            self.cset.entries[id].metadata_change = contents
224
225
    def change_name_tree(self, id, tree, name):
226
        new_path = tree.child_path(self.cset.entries[id].parent, name)
227
        tree.change_path(id, new_path)
228
229
    def change_parent_tree(self, id, tree, parent):
230
        new_path = tree.child_path(parent, self.cset.entries[id].name)
231
        tree.change_path(id, new_path)
232
233
    def change_contents_tree(self, id, tree, contents):
234
        path = tree.full_path(id)
235
        mode = os.stat(path).st_mode
236
        file(path, "w").write(contents)
237
        os.chmod(path, mode)
238
239
    def change_perms_tree(self, id, tree, mode):
240
        os.chmod(tree.full_path(id), mode)
241
242
    def merge_changeset(self, merge_factory):
974.1.83 by Aaron Bentley
Removed unused dir parameter from ExceptionConflictHandler
243
        conflict_handler = changeset.ExceptionConflictHandler()
1092.1.24 by Robert Collins
move merge_core tests into the selftest package. Also reduce double-run of those tests
244
        return make_merge_changeset(self.cset, self.this, self.base,
245
                                    self.other, conflict_handler,
246
                                    merge_factory)
247
248
    def apply_inv_change(self, inventory_change, orig_inventory):
249
        orig_inventory_by_path = {}
250
        for file_id, path in orig_inventory.iteritems():
251
            orig_inventory_by_path[path] = file_id
252
253
        def parent_id(file_id):
254
            try:
255
                parent_dir = os.path.dirname(orig_inventory[file_id])
256
            except:
257
                print file_id
258
                raise
259
            if parent_dir == "":
260
                return None
261
            return orig_inventory_by_path[parent_dir]
262
        
263
        def new_path(file_id):
264
            if inventory_change.has_key(file_id):
265
                return inventory_change[file_id]
266
            else:
267
                parent = parent_id(file_id)
268
                if parent is None:
269
                    return orig_inventory[file_id]
270
                dirname = new_path(parent)
271
                return os.path.join(dirname, orig_inventory[file_id])
272
273
        new_inventory = {}
274
        for file_id in orig_inventory.iterkeys():
275
            path = new_path(file_id)
276
            if path is None:
277
                continue
278
            new_inventory[file_id] = path
279
280
        for file_id, path in inventory_change.iteritems():
281
            if orig_inventory.has_key(file_id):
282
                continue
283
            new_inventory[file_id] = path
284
        return new_inventory
285
286
    def apply_changeset(self, cset, conflict_handler=None, reverse=False):
287
        inventory_change = changeset.apply_changeset(cset,
288
                                                     self.this.inventory,
289
                                                     self.this.dir,
290
                                                     conflict_handler, reverse)
291
        self.this.inventory =  self.apply_inv_change(inventory_change, 
292
                                                     self.this.inventory)
293
294
    def cleanup(self):
295
        shutil.rmtree(self.dir)
296
297
class MergeTest(unittest.TestCase):
298
    def test_change_name(self):
299
        """Test renames"""
300
        builder = MergeBuilder()
301
        builder.add_file("1", "0", "name1", "hello1", 0755)
302
        builder.change_name("1", other="name2")
303
        builder.add_file("2", "0", "name3", "hello2", 0755)
304
        builder.change_name("2", base="name4")
305
        builder.add_file("3", "0", "name5", "hello3", 0755)
306
        builder.change_name("3", this="name6")
307
        cset = builder.merge_changeset(ApplyMerge3)
308
        assert(cset.entries["2"].is_boring())
309
        assert(cset.entries["1"].name == "name1")
310
        assert(cset.entries["1"].new_name == "name2")
311
        assert(cset.entries["3"].is_boring())
312
        for tree in (builder.this, builder.other, builder.base):
313
            assert(tree.dir != builder.dir and 
314
                   tree.dir.startswith(builder.dir))
315
            for path in tree.inventory.itervalues():
316
                fullpath = tree.abs_path(path)
317
                assert(fullpath.startswith(tree.dir))
318
                assert(not path.startswith(tree.dir))
319
                assert os.path.exists(fullpath)
320
        builder.apply_changeset(cset)
321
        builder.cleanup()
322
        builder = MergeBuilder()
323
        builder.add_file("1", "0", "name1", "hello1", 0644)
324
        builder.change_name("1", other="name2", this="name3")
325
        self.assertRaises(changeset.RenameConflict, 
326
                          builder.merge_changeset, ApplyMerge3)
327
        builder.cleanup()
328
        
329
    def test_file_moves(self):
330
        """Test moves"""
331
        builder = MergeBuilder()
332
        builder.add_dir("1", "0", "dir1", 0755)
333
        builder.add_dir("2", "0", "dir2", 0755)
334
        builder.add_file("3", "1", "file1", "hello1", 0644)
335
        builder.add_file("4", "1", "file2", "hello2", 0644)
336
        builder.add_file("5", "1", "file3", "hello3", 0644)
337
        builder.change_parent("3", other="2")
338
        assert(Inventory(builder.other.inventory).get_parent("3") == "2")
339
        builder.change_parent("4", this="2")
340
        assert(Inventory(builder.this.inventory).get_parent("4") == "2")
341
        builder.change_parent("5", base="2")
342
        assert(Inventory(builder.base.inventory).get_parent("5") == "2")
343
        cset = builder.merge_changeset(ApplyMerge3)
344
        for id in ("1", "2", "4", "5"):
345
            assert(cset.entries[id].is_boring())
346
        assert(cset.entries["3"].parent == "1")
347
        assert(cset.entries["3"].new_parent == "2")
348
        builder.apply_changeset(cset)
349
        builder.cleanup()
350
351
        builder = MergeBuilder()
352
        builder.add_dir("1", "0", "dir1", 0755)
353
        builder.add_dir("2", "0", "dir2", 0755)
354
        builder.add_dir("3", "0", "dir3", 0755)
355
        builder.add_file("4", "1", "file1", "hello1", 0644)
356
        builder.change_parent("4", other="2", this="3")
357
        self.assertRaises(changeset.MoveConflict, 
358
                          builder.merge_changeset, ApplyMerge3)
359
        builder.cleanup()
360
361
    def test_contents_merge(self):
362
        """Test merge3 merging"""
363
        self.do_contents_test(ApplyMerge3)
364
365
    def test_contents_merge2(self):
366
        """Test diff3 merging"""
367
        self.do_contents_test(changeset.Diff3Merge)
368
369
    def test_contents_merge3(self):
370
        """Test diff3 merging"""
371
        def backup_merge(file_id, base, other):
372
            return BackupBeforeChange(ApplyMerge3(file_id, base, other))
373
        builder = self.contents_test_success(backup_merge)
374
        def backup_exists(file_id):
375
            return os.path.exists(builder.this.full_path(file_id)+"~")
376
        assert backup_exists("1")
377
        assert backup_exists("2")
378
        assert not backup_exists("3")
379
        builder.cleanup()
380
381
    def do_contents_test(self, merge_factory):
382
        """Test merging with specified ContentsChange factory"""
383
        builder = self.contents_test_success(merge_factory)
384
        builder.cleanup()
385
        self.contents_test_conflicts(merge_factory)
386
387
    def contents_test_success(self, merge_factory):
388
        from inspect import isclass
389
        builder = MergeBuilder()
390
        builder.add_file("1", "0", "name1", "text1", 0755)
391
        builder.change_contents("1", other="text4")
392
        builder.add_file("2", "0", "name3", "text2", 0655)
393
        builder.change_contents("2", base="text5")
394
        builder.add_file("3", "0", "name5", "text3", 0744)
395
        builder.add_file("4", "0", "name6", "text4", 0744)
396
        builder.remove_file("4", base=True)
397
        assert not builder.cset.entries["4"].is_boring()
398
        builder.change_contents("3", this="text6")
399
        cset = builder.merge_changeset(merge_factory)
400
        assert(cset.entries["1"].contents_change is not None)
401
        if isclass(merge_factory):
402
            assert(isinstance(cset.entries["1"].contents_change,
403
                          merge_factory))
404
            assert(isinstance(cset.entries["2"].contents_change,
405
                          merge_factory))
406
        assert(cset.entries["3"].is_boring())
407
        assert(cset.entries["4"].is_boring())
408
        builder.apply_changeset(cset)
409
        assert(file(builder.this.full_path("1"), "rb").read() == "text4" )
410
        assert(file(builder.this.full_path("2"), "rb").read() == "text2" )
411
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0755)
412
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0655)
413
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0744)
414
        return builder
415
416
    def contents_test_conflicts(self, merge_factory):
417
        builder = MergeBuilder()
418
        builder.add_file("1", "0", "name1", "text1", 0755)
419
        builder.change_contents("1", other="text4", this="text3")
420
        cset = builder.merge_changeset(merge_factory)
421
        self.assertRaises(changeset.MergeConflict, builder.apply_changeset,
422
                          cset)
423
        builder.cleanup()
424
425
        builder = MergeBuilder()
426
        builder.add_file("1", "0", "name1", "text1", 0755)
427
        builder.change_contents("1", other="text4", base="text3")
428
        builder.remove_file("1", base=True)
429
        self.assertRaises(changeset.NewContentsConflict,
430
                          builder.merge_changeset, merge_factory)
431
        builder.cleanup()
432
433
        builder = MergeBuilder()
434
        builder.add_file("1", "0", "name1", "text1", 0755)
435
        builder.change_contents("1", other="text4", base="text3")
436
        builder.remove_file("1", this=True)
437
        self.assertRaises(changeset.MissingForMerge, builder.merge_changeset, 
438
                          merge_factory)
439
        builder.cleanup()
440
441
    def test_perms_merge(self):
442
        builder = MergeBuilder()
443
        builder.add_file("1", "0", "name1", "text1", 0755)
444
        builder.change_perms("1", other=0655)
445
        builder.add_file("2", "0", "name2", "text2", 0755)
446
        builder.change_perms("2", base=0655)
447
        builder.add_file("3", "0", "name3", "text3", 0755)
448
        builder.change_perms("3", this=0655)
449
        cset = builder.merge_changeset(ApplyMerge3)
450
        assert(cset.entries["1"].metadata_change is not None)
451
        assert(isinstance(cset.entries["1"].metadata_change,
452
                          PermissionsMerge))
453
        assert(isinstance(cset.entries["2"].metadata_change,
454
                          PermissionsMerge))
455
        assert(cset.entries["3"].is_boring())
456
        builder.apply_changeset(cset)
457
        assert(os.stat(builder.this.full_path("1")).st_mode &0777 == 0655)
458
        assert(os.stat(builder.this.full_path("2")).st_mode &0777 == 0755)
459
        assert(os.stat(builder.this.full_path("3")).st_mode &0777 == 0655)
460
        builder.cleanup();
461
        builder = MergeBuilder()
462
        builder.add_file("1", "0", "name1", "text1", 0755)
463
        builder.change_perms("1", other=0655, base=0555)
464
        cset = builder.merge_changeset(ApplyMerge3)
465
        self.assertRaises(changeset.MergePermissionConflict, 
466
                     builder.apply_changeset, cset)
467
        builder.cleanup()
1092.1.25 by Robert Collins
prepare to write merge tests
468
1141 by Martin Pool
- rename FunctionalTest to TestCaseInTempDir
469
class FunctionalMergeTest(TestCaseInTempDir):
1092.1.25 by Robert Collins
prepare to write merge tests
470
471
    def test_trivial_star_merge(self):
472
        """Test that merges in a star shape Just Work.""" 
1159 by Martin Pool
- clean up parameters to smart_add and smart_add_branch
473
        from bzrlib.add import smart_add_branch, add_reporter_null
1092.1.34 by Robert Collins
unbreak cmd_branch now that something tests the core of it..
474
        from bzrlib.branch import copy_branch
1092.1.38 by Robert Collins
make a default merge choose a sane base with branch.common_ancestor
475
        from bzrlib.merge import merge
1092.1.33 by Robert Collins
pull the important stuff out of cmd_branch.run to branch.copy_branch
476
        # John starts a branch
1092.1.26 by Robert Collins
start writing star-topology test, realise we need smart-add change
477
        self.build_tree(("original/", "original/file1", "original/file2"))
1185.2.9 by Lalo Martins
getting rid of everything that calls the Branch constructor directly
478
        branch = Branch.initialize("original")
1159 by Martin Pool
- clean up parameters to smart_add and smart_add_branch
479
        smart_add_branch(branch, ["original"], True, add_reporter_null)
1092.1.31 by Robert Collins
start extending the star topology merge test now that smart add is more usable
480
        branch.commit("start branch.", verbose=False)
1092.1.33 by Robert Collins
pull the important stuff out of cmd_branch.run to branch.copy_branch
481
        # Mary branches it.
1092.1.34 by Robert Collins
unbreak cmd_branch now that something tests the core of it..
482
        self.build_tree(("mary/",))
483
        copy_branch(branch, "mary")
1092.1.38 by Robert Collins
make a default merge choose a sane base with branch.common_ancestor
484
        # Now John commits a change
485
        file = open("original/file1", "wt")
486
        file.write("John\n")
487
        file.close()
488
        branch.commit("change file1")
489
        # Mary does too
1185.2.9 by Lalo Martins
getting rid of everything that calls the Branch constructor directly
490
        mary_branch = Branch.open("mary")
1092.1.38 by Robert Collins
make a default merge choose a sane base with branch.common_ancestor
491
        file = open("mary/file2", "wt")
492
        file.write("Mary\n")
493
        file.close()
494
        mary_branch.commit("change file2")
495
        # john should be able to merge with no conflicts.
496
        merge_type = ApplyMerge3
1092.1.41 by Robert Collins
merge from abently, take his fixes for merge in preference
497
        base = [None, None]
1092.1.38 by Robert Collins
make a default merge choose a sane base with branch.common_ancestor
498
        other = ("mary", -1)
499
        merge(other, base, check_clean=True, merge_type=merge_type, this_dir="original")
500
        self.assertEqual("John\n", open("original/file1", "rt").read())
501
        self.assertEqual("Mary\n", open("original/file2", "rt").read())