1
# Copyright (C) 2006 by Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
"""Tests for join between versioned files."""
20
import bzrlib.errors as errors
21
from bzrlib.tests import TestCaseWithTransport
22
from bzrlib.transport import get_transport
23
import bzrlib.versionedfile as versionedfile
26
class TestJoin(TestCaseWithTransport):
27
#Tests have self.versionedfile_factory and self.versionedfile_factory_to
28
#available to create source and target versioned files respectively.
30
def get_source(self, name='source'):
31
"""Get a versioned file we will be joining from."""
32
return self.versionedfile_factory(name,
33
get_transport(self.get_url()),
36
def get_target(self, name='target'):
37
""""Get an empty versioned file to join into."""
38
return self.versionedfile_factory_to(name,
39
get_transport(self.get_url()),
43
f1 = self.get_source()
44
f1.add_lines('r0', [], ['a\n', 'b\n'])
45
f1.add_lines('r1', ['r0'], ['c\n', 'b\n'])
46
f2 = self.get_target()
49
self.assertTrue(f.has_version('r0'))
50
self.assertTrue(f.has_version('r1'))
52
verify_file(self.get_target())
54
self.assertRaises(errors.RevisionNotPresent,
55
f2.join, f1, version_ids=['r3'])
57
#f3 = self.get_file('1')
58
#f3.add_lines('r0', ['a\n', 'b\n'], [])
59
#f3.add_lines('r1', ['c\n', 'b\n'], ['r0'])
60
#f4 = self.get_file('2')
62
#self.assertTrue(f4.has_version('r0'))
63
#self.assertFalse(f4.has_version('r1'))
65
def test_gets_expected_inter_worker(self):
66
source = self.get_source()
67
target = self.get_target()
68
inter = versionedfile.InterVersionedFile.get(source, target)
69
self.assertTrue(isinstance(inter, self.interversionedfile_class))
71
def test_join_versions_joins_ancestors_not_siblings(self):
72
# joining with a version list should bring in ancestors of the
73
# named versions but not siblings thereof.
74
target = self.get_target()
75
target.add_lines('base', [], [])
76
source = self.get_source()
77
source.add_lines('base', [], [])
78
source.add_lines('sibling', ['base'], [])
79
source.add_lines('ancestorleft', ['base'], [])
80
source.add_lines('ancestorright', ['base'], [])
81
source.add_lines('namedleft', ['ancestorleft'], [])
82
source.add_lines('namedright', ['ancestorright'], [])
83
target.join(source, version_ids=['namedleft', 'namedright'])
84
self.assertFalse(target.has_version('sibling'))
85
self.assertTrue(target.has_version('ancestorleft'))
86
self.assertTrue(target.has_version('ancestorright'))
87
self.assertTrue(target.has_version('namedleft'))
88
self.assertTrue(target.has_version('namedright'))
90
def test_join_add_parents(self):
91
"""Join inserting new parents into existing versions
93
The new version must have the right parent list and must identify
94
lines originating in another parent.
96
w1 = self.get_target('w1')
97
w2 = self.get_source('w2')
98
w1.add_lines('v-1', [], ['line 1\n'])
99
w2.add_lines('v-2', [], ['line 2\n'])
100
w1.add_lines('v-3', ['v-1'], ['line 1\n'])
101
w2.add_lines('v-3', ['v-2'], ['line 1\n'])
103
self.assertEqual(sorted(w1.versions()),
104
'v-1 v-2 v-3'.split())
105
self.assertEqualDiff(w1.get_text('v-3'),
107
self.assertEqual(sorted(w1.get_parents('v-3')),
109
ann = list(w1.annotate('v-3'))
110
self.assertEqual(len(ann), 1)
111
self.assertEqual(ann[0][0], 'v-1')
112
self.assertEqual(ann[0][1], 'line 1\n')
114
def build_weave1(self):
115
weave1 = self.get_source()
116
self.lines1 = ['hello\n']
117
self.lines3 = ['hello\n', 'cruel\n', 'world\n']
118
weave1.add_lines('v1', [], self.lines1)
119
weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
120
weave1.add_lines('v3', ['v2'], self.lines3)
123
def test_join_with_empty(self):
124
"""Reweave adding empty weave"""
125
wb = self.get_target()
126
w1 = self.build_weave1()
128
self.verify_weave1(w1)
130
def verify_weave1(self, w1):
131
self.assertEqual(sorted(w1.versions()), ['v1', 'v2', 'v3'])
132
self.assertEqual(w1.get_lines('v1'), ['hello\n'])
133
self.assertEqual([], w1.get_parents('v1'))
134
self.assertEqual(w1.get_lines('v2'), ['hello\n', 'world\n'])
135
self.assertEqual(['v1'], w1.get_parents('v2'))
136
self.assertEqual(w1.get_lines('v3'), ['hello\n', 'cruel\n', 'world\n'])
137
self.assertEqual(['v2'], w1.get_parents('v3'))
139
def test_join_with_ghosts_merges_parents(self):
140
"""Join combined parent lists"""
141
wa = self.build_weave1()
142
wb = self.get_target()
143
wb.add_lines('x1', [], ['line from x1\n'])
144
wb.add_lines('v1', [], ['hello\n'])
145
wb.add_lines('v2', ['v1', 'x1'], ['hello\n', 'world\n'])
147
self.assertEqual(['v1','x1'], wa.get_parents('v2'))
149
def test_join_with_ghosts(self):
150
"""Join that inserts parents of an existing revision.
152
This can happen when merging from another branch who
153
knows about revisions the destination does not. In
154
this test the second weave knows of an additional parent of
155
v2. Any revisions which are in common still have to have the
158
w1 = self.build_weave1()
159
wb = self.get_target()
160
wb.add_lines('x1', [], ['line from x1\n'])
161
wb.add_lines('v1', [], ['hello\n'])
162
wb.add_lines('v2', ['v1', 'x1'], ['hello\n', 'world\n'])
164
eq = self.assertEquals
165
eq(sorted(w1.versions()), ['v1', 'v2', 'v3', 'x1',])
166
eq(w1.get_text('x1'), 'line from x1\n')
167
eq(w1.get_lines('v2'), ['hello\n', 'world\n'])
168
eq(w1.get_parents('v2'), ['v1', 'x1'])
170
def test_join_with_ignore_missing_versions(self):
171
# test that ignore_missing=True makes a listed but absent version id
172
# be ignored, and that unlisted version_ids are not integrated.
173
w1 = self.build_weave1()
174
wb = self.get_target()
175
wb.add_lines('x1', [], ['line from x1\n'])
176
wb.add_lines('v1', [], ['hello\n'])
177
wb.add_lines('v2', ['v1', 'x1'], ['hello\n', 'world\n'])
178
w1.join(wb, version_ids=['x1', 'z1'], ignore_missing=True)
179
eq = self.assertEquals
180
eq(sorted(w1.versions()), ['v1', 'v2', 'v3', 'x1'])
181
eq(w1.get_text('x1'), 'line from x1\n')
182
eq(w1.get_lines('v2'), ['hello\n', 'world\n'])
183
eq(w1.get_parents('v2'), ['v1'])
185
def build_source_weave(self, name, *pattern):
186
w = self.get_source(name)
187
for version, parents in pattern:
188
w.add_lines(version, parents, [])
191
def build_target_weave(self, name, *pattern):
192
w = self.get_target(name)
193
for version, parents in pattern:
194
w.add_lines(version, parents, [])
197
def test_join_reorder(self):
198
"""Reweave requiring reordering of versions.
200
Weaves must be stored such that parents come before children. When
201
reweaving, we may add new parents to some children, but it is required
202
that there must be *some* valid order that can be found, otherwise the
203
ancestries are contradictory. (For the specific case of inserting
204
ghost revisions there will be no disagreement, only partial knowledge
207
Note that the weaves are only partially ordered: when there are two
208
versions where neither is an ancestor of the other the order in which
209
they occur is unconstrained. When we join those versions into
210
another weave, they may become more constrained and it may be
211
necessary to change their order.
213
One simple case of this is
218
We need to recognize that the final weave must show the ordering
219
a[], b[a], c[b]. The version that must be first in the result is
220
not first in either of the input weaves.
222
w1 = self.build_target_weave('1', ('c', []), ('a', []), ('b', ['a']))
223
w2 = self.build_source_weave('2', ('b', []), ('c', ['b']), ('a', []))
225
self.assertEqual([], w1.get_parents('a'))
226
self.assertEqual(['a'], w1.get_parents('b'))
227
self.assertEqual(['b'], w1.get_parents('c'))