1
# Copyright (C) 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Annotators."""
29
def load_tests(standard_tests, module, loader):
30
"""Parameterize tests for all versions of groupcompress."""
31
suite, _ = tests.permute_tests_for_extension(standard_tests, loader,
32
'bzrlib._annotator_py', 'bzrlib._annotator_pyx')
36
class TestAnnotator(tests.TestCaseWithMemoryTransport):
38
module = None # Set by load_tests
40
fa_key = ('f-id', 'a-id')
41
fb_key = ('f-id', 'b-id')
42
fc_key = ('f-id', 'c-id')
43
fd_key = ('f-id', 'd-id')
44
fe_key = ('f-id', 'e-id')
45
ff_key = ('f-id', 'f-id')
47
def make_no_graph_texts(self):
48
factory = knit.make_pack_factory(False, False, 2)
49
self.vf = factory(self.get_transport())
50
self.ann = self.module.Annotator(self.vf)
51
self.vf.add_lines(self.fa_key, (), ['simple\n', 'content\n'])
52
self.vf.add_lines(self.fb_key, (), ['simple\n', 'new content\n'])
54
def make_simple_text(self):
55
# TODO: all we really need is a VersionedFile instance, we'd like to
56
# avoid creating all the intermediate stuff
57
factory = knit.make_pack_factory(True, True, 2)
58
self.vf = factory(self.get_transport())
59
# This assumes nothing special happens during __init__, which may be
61
self.ann = self.module.Annotator(self.vf)
64
# B 'simple|new content|'
65
self.vf.add_lines(self.fa_key, [], ['simple\n', 'content\n'])
66
self.vf.add_lines(self.fb_key, [self.fa_key],
67
['simple\n', 'new content\n'])
69
def make_merge_text(self):
70
self.make_simple_text()
73
# B | 'simple|new content|'
75
# | C 'simple|from c|content|'
77
# D 'simple|from c|new content|introduced in merge|'
78
self.vf.add_lines(self.fc_key, [self.fa_key],
79
['simple\n', 'from c\n', 'content\n'])
80
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
81
['simple\n', 'from c\n', 'new content\n',
82
'introduced in merge\n'])
84
def make_common_merge_text(self):
85
"""Both sides of the merge will have introduced a line."""
86
self.make_simple_text()
89
# B | 'simple|new content|'
91
# | C 'simple|new content|'
93
# D 'simple|new content|'
94
self.vf.add_lines(self.fc_key, [self.fa_key],
95
['simple\n', 'new content\n'])
96
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
97
['simple\n', 'new content\n'])
99
def make_many_way_common_merge_text(self):
100
self.make_simple_text()
101
# A-. 'simple|content|'
103
# B | | 'simple|new content|'
105
# | C | 'simple|new content|'
107
# D | 'simple|new content|'
109
# | E 'simple|new content|'
111
# F-' 'simple|new content|'
112
self.vf.add_lines(self.fc_key, [self.fa_key],
113
['simple\n', 'new content\n'])
114
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
115
['simple\n', 'new content\n'])
116
self.vf.add_lines(self.fe_key, [self.fa_key],
117
['simple\n', 'new content\n'])
118
self.vf.add_lines(self.ff_key, [self.fd_key, self.fe_key],
119
['simple\n', 'new content\n'])
121
def make_merge_and_restored_text(self):
122
self.make_simple_text()
123
# A 'simple|content|'
125
# B | 'simple|new content|'
127
# C | 'simple|content|' # reverted to A
129
# D 'simple|content|'
130
# c reverts back to 'a' for the new content line
131
self.vf.add_lines(self.fc_key, [self.fb_key],
132
['simple\n', 'content\n'])
133
# d merges 'a' and 'c', to find both claim last modified
134
self.vf.add_lines(self.fd_key, [self.fa_key, self.fc_key],
135
['simple\n', 'content\n'])
137
def assertAnnotateEqual(self, expected_annotation, key, exp_text=None):
138
annotation, lines = self.ann.annotate(key)
139
self.assertEqual(expected_annotation, annotation)
141
record = self.vf.get_record_stream([key], 'unordered', True).next()
142
exp_text = record.get_bytes_as('fulltext')
143
self.assertEqualDiff(exp_text, ''.join(lines))
145
def test_annotate_missing(self):
146
self.make_simple_text()
147
self.assertRaises(errors.RevisionNotPresent,
148
self.ann.annotate, ('not', 'present'))
150
def test_annotate_simple(self):
151
self.make_simple_text()
152
self.assertAnnotateEqual([(self.fa_key,)]*2, self.fa_key)
153
self.assertAnnotateEqual([(self.fa_key,), (self.fb_key,)], self.fb_key)
155
def test_annotate_merge_text(self):
156
self.make_merge_text()
157
self.assertAnnotateEqual([(self.fa_key,), (self.fc_key,),
158
(self.fb_key,), (self.fd_key,)],
161
def test_annotate_common_merge_text(self):
162
self.make_common_merge_text()
163
self.assertAnnotateEqual([(self.fa_key,), (self.fb_key, self.fc_key)],
166
def test_annotate_many_way_common_merge_text(self):
167
self.make_many_way_common_merge_text()
168
self.assertAnnotateEqual([(self.fa_key,),
169
(self.fb_key, self.fc_key, self.fe_key)],
172
def test_annotate_merge_and_restored(self):
173
self.make_merge_and_restored_text()
174
self.assertAnnotateEqual([(self.fa_key,), (self.fa_key, self.fc_key)],
177
def test_annotate_flat_simple(self):
178
self.make_simple_text()
179
self.assertEqual([(self.fa_key, 'simple\n'),
180
(self.fa_key, 'content\n'),
181
], self.ann.annotate_flat(self.fa_key))
182
self.assertEqual([(self.fa_key, 'simple\n'),
183
(self.fb_key, 'new content\n'),
184
], self.ann.annotate_flat(self.fb_key))
186
def test_annotate_flat_merge_and_restored_text(self):
187
self.make_merge_and_restored_text()
188
# fc is a simple dominator of fa
189
self.assertEqual([(self.fa_key, 'simple\n'),
190
(self.fc_key, 'content\n'),
191
], self.ann.annotate_flat(self.fd_key))
193
def test_annotate_common_merge_text(self):
194
self.make_common_merge_text()
195
# there is no common point, so we just pick the lexicographical lowest
196
# and 'b-id' comes before 'c-id'
197
self.assertEqual([(self.fa_key, 'simple\n'),
198
(self.fb_key, 'new content\n'),
199
], self.ann.annotate_flat(self.fd_key))
201
def test_annotate_many_way_common_merge_text(self):
202
self.make_many_way_common_merge_text()
203
self.assertEqual([(self.fa_key, 'simple\n'),
204
(self.fb_key, 'new content\n')],
205
self.ann.annotate_flat(self.ff_key))
207
def test_annotate_flat_respects_break_ann_tie(self):
208
tiebreaker = annotate._break_annotation_tie
211
def custom_tiebreaker(annotated_lines):
212
self.assertEqual(2, len(annotated_lines))
213
left = annotated_lines[0]
214
self.assertEqual(2, len(left))
215
self.assertEqual('new content\n', left[1])
216
right = annotated_lines[1]
217
self.assertEqual(2, len(right))
218
self.assertEqual('new content\n', right[1])
219
calls.append((left[0], right[0]))
220
# Our custom tiebreaker takes the *largest* value, rather than
221
# the *smallest* value
222
if left[0] < right[0]:
226
annotate._break_annotation_tie = custom_tiebreaker
227
self.make_many_way_common_merge_text()
228
self.assertEqual([(self.fa_key, 'simple\n'),
229
(self.fe_key, 'new content\n')],
230
self.ann.annotate_flat(self.ff_key))
231
self.assertEqual([(self.fe_key, self.fc_key),
232
(self.fe_key, self.fb_key)], calls)
234
annotate._break_annotation_tie = tiebreaker
237
def test_needed_keys_simple(self):
238
self.make_simple_text()
239
keys, ann_keys = self.ann._get_needed_keys(self.fb_key)
240
self.assertEqual([self.fa_key, self.fb_key], sorted(keys))
241
self.assertEqual({self.fa_key: 1, self.fb_key: 1},
242
self.ann._num_needed_children)
243
self.assertEqual(set(), ann_keys)
245
def test_needed_keys_many(self):
246
self.make_many_way_common_merge_text()
247
keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
248
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
249
self.fd_key, self.fe_key, self.ff_key,
251
self.assertEqual({self.fa_key: 3,
257
}, self.ann._num_needed_children)
258
self.assertEqual(set(), ann_keys)
260
def test_needed_keys_with_special_text(self):
261
self.make_many_way_common_merge_text()
262
spec_key = ('f-id', revision.CURRENT_REVISION)
263
spec_text = 'simple\nnew content\nlocally modified\n'
264
self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
266
keys, ann_keys = self.ann._get_needed_keys(spec_key)
267
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
268
self.fd_key, self.fe_key,
270
self.assertEqual([spec_key], sorted(ann_keys))
272
def test_needed_keys_with_parent_texts(self):
273
self.make_many_way_common_merge_text()
274
# If 'D' and 'E' are already annotated, we don't need to extract all
276
# D | 'simple|new content|'
278
# | E 'simple|new content|'
280
# F-' 'simple|new content|'
281
self.ann._parent_map[self.fd_key] = (self.fb_key, self.fc_key)
282
self.ann._text_cache[self.fd_key] = ['simple\n', 'new content\n']
283
self.ann._annotations_cache[self.fd_key] = [
285
(self.fb_key, self.fc_key),
287
self.ann._parent_map[self.fe_key] = (self.fa_key,)
288
self.ann._text_cache[self.fe_key] = ['simple\n', 'new content\n']
289
self.ann._annotations_cache[self.fe_key] = [
293
keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
294
self.assertEqual([self.ff_key], sorted(keys))
295
self.assertEqual({self.fd_key: 1,
298
}, self.ann._num_needed_children)
299
self.assertEqual([], sorted(ann_keys))
301
def test_record_annotation_removes_texts(self):
302
self.make_many_way_common_merge_text()
303
# Populate the caches
304
for x in self.ann._get_needed_texts(self.ff_key):
306
self.assertEqual({self.fa_key: 3,
312
}, self.ann._num_needed_children)
313
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
314
self.fd_key, self.fe_key, self.ff_key,
315
], sorted(self.ann._text_cache.keys()))
316
self.ann._record_annotation(self.fa_key, [], [])
317
self.ann._record_annotation(self.fb_key, [self.fa_key], [])
318
self.assertEqual({self.fa_key: 2,
324
}, self.ann._num_needed_children)
325
self.assertTrue(self.fa_key in self.ann._text_cache)
326
self.assertTrue(self.fa_key in self.ann._annotations_cache)
327
self.ann._record_annotation(self.fc_key, [self.fa_key], [])
328
self.ann._record_annotation(self.fd_key, [self.fb_key, self.fc_key], [])
329
self.assertEqual({self.fa_key: 1,
335
}, self.ann._num_needed_children)
336
self.assertTrue(self.fa_key in self.ann._text_cache)
337
self.assertTrue(self.fa_key in self.ann._annotations_cache)
338
self.assertFalse(self.fb_key in self.ann._text_cache)
339
self.assertFalse(self.fb_key in self.ann._annotations_cache)
340
self.assertFalse(self.fc_key in self.ann._text_cache)
341
self.assertFalse(self.fc_key in self.ann._annotations_cache)
343
def test_annotate_special_text(self):
344
# Things like WT and PreviewTree want to annotate an arbitrary text
345
# ('current:') so we need a way to add that to the group of files to be
347
self.make_many_way_common_merge_text()
348
# A-. 'simple|content|'
350
# B | | 'simple|new content|'
352
# | C | 'simple|new content|'
354
# D | 'simple|new content|'
356
# | E 'simple|new content|'
358
# SPEC 'simple|new content|locally modified|'
359
spec_key = ('f-id', revision.CURRENT_REVISION)
360
spec_text = 'simple\nnew content\nlocally modified\n'
361
self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
363
self.assertAnnotateEqual([(self.fa_key,),
364
(self.fb_key, self.fc_key, self.fe_key),
369
def test_no_graph(self):
370
self.make_no_graph_texts()
371
self.assertAnnotateEqual([(self.fa_key,),
374
self.assertAnnotateEqual([(self.fb_key,),