1
# Copyright (C) 2009 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
"""Tests for Annotators."""
29
def load_tests(standard_tests, module, loader):
30
"""Parameterize tests for all versions of groupcompress."""
32
('python', {'module': _annotator_py}),
34
suite = loader.suiteClass()
35
if CompiledAnnotator.available():
36
from bzrlib import _annotator_pyx
37
scenarios.append(('C', {'module': _annotator_pyx}))
39
# the compiled module isn't available, so we add a failing test
40
class FailWithoutFeature(tests.TestCase):
42
self.requireFeature(CompiledAnnotator)
43
suite.addTest(loader.loadTestsFromTestCase(FailWithoutFeature))
44
result = tests.multiply_tests(standard_tests, scenarios, suite)
48
class _CompiledAnnotator(tests.Feature):
52
import bzrlib._annotator_pyx
57
def feature_name(self):
58
return 'bzrlib._annotator_pyx'
60
CompiledAnnotator = _CompiledAnnotator()
63
class TestAnnotator(tests.TestCaseWithMemoryTransport):
65
module = None # Set by load_tests
67
fa_key = ('f-id', 'a-id')
68
fb_key = ('f-id', 'b-id')
69
fc_key = ('f-id', 'c-id')
70
fd_key = ('f-id', 'd-id')
71
fe_key = ('f-id', 'e-id')
72
ff_key = ('f-id', 'f-id')
74
def make_no_graph_texts(self):
75
factory = knit.make_pack_factory(False, False, 2)
76
self.vf = factory(self.get_transport())
77
self.ann = self.module.Annotator(self.vf)
78
self.vf.add_lines(self.fa_key, (), ['simple\n', 'content\n'])
79
self.vf.add_lines(self.fb_key, (), ['simple\n', 'new content\n'])
81
def make_simple_text(self):
82
# TODO: all we really need is a VersionedFile instance, we'd like to
83
# avoid creating all the intermediate stuff
84
factory = knit.make_pack_factory(True, True, 2)
85
self.vf = factory(self.get_transport())
86
# This assumes nothing special happens during __init__, which may be
88
self.ann = self.module.Annotator(self.vf)
91
# B 'simple|new content|'
92
self.vf.add_lines(self.fa_key, [], ['simple\n', 'content\n'])
93
self.vf.add_lines(self.fb_key, [self.fa_key],
94
['simple\n', 'new content\n'])
96
def make_merge_text(self):
97
self.make_simple_text()
100
# B | 'simple|new content|'
102
# | C 'simple|from c|content|'
104
# D 'simple|from c|new content|introduced in merge|'
105
self.vf.add_lines(self.fc_key, [self.fa_key],
106
['simple\n', 'from c\n', 'content\n'])
107
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
108
['simple\n', 'from c\n', 'new content\n',
109
'introduced in merge\n'])
111
def make_common_merge_text(self):
112
"""Both sides of the merge will have introduced a line."""
113
self.make_simple_text()
114
# A 'simple|content|'
116
# B | 'simple|new content|'
118
# | C 'simple|new content|'
120
# D 'simple|new content|'
121
self.vf.add_lines(self.fc_key, [self.fa_key],
122
['simple\n', 'new content\n'])
123
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
124
['simple\n', 'new content\n'])
126
def make_many_way_common_merge_text(self):
127
self.make_simple_text()
128
# A-. 'simple|content|'
130
# B | | 'simple|new content|'
132
# | C | 'simple|new content|'
134
# D | 'simple|new content|'
136
# | E 'simple|new content|'
138
# F-' 'simple|new content|'
139
self.vf.add_lines(self.fc_key, [self.fa_key],
140
['simple\n', 'new content\n'])
141
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
142
['simple\n', 'new content\n'])
143
self.vf.add_lines(self.fe_key, [self.fa_key],
144
['simple\n', 'new content\n'])
145
self.vf.add_lines(self.ff_key, [self.fd_key, self.fe_key],
146
['simple\n', 'new content\n'])
148
def make_merge_and_restored_text(self):
149
self.make_simple_text()
150
# A 'simple|content|'
152
# B | 'simple|new content|'
154
# C | 'simple|content|' # reverted to A
156
# D 'simple|content|'
157
# c reverts back to 'a' for the new content line
158
self.vf.add_lines(self.fc_key, [self.fb_key],
159
['simple\n', 'content\n'])
160
# d merges 'a' and 'c', to find both claim last modified
161
self.vf.add_lines(self.fd_key, [self.fa_key, self.fc_key],
162
['simple\n', 'content\n'])
164
def assertAnnotateEqual(self, expected_annotation, key, exp_text=None):
165
annotation, lines = self.ann.annotate(key)
166
self.assertEqual(expected_annotation, annotation)
168
record = self.vf.get_record_stream([key], 'unordered', True).next()
169
exp_text = record.get_bytes_as('fulltext')
170
self.assertEqualDiff(exp_text, ''.join(lines))
172
def test_annotate_missing(self):
173
self.make_simple_text()
174
self.assertRaises(errors.RevisionNotPresent,
175
self.ann.annotate, ('not', 'present'))
177
def test_annotate_simple(self):
178
self.make_simple_text()
179
self.assertAnnotateEqual([(self.fa_key,)]*2, self.fa_key)
180
self.assertAnnotateEqual([(self.fa_key,), (self.fb_key,)], self.fb_key)
182
def test_annotate_merge_text(self):
183
self.make_merge_text()
184
self.assertAnnotateEqual([(self.fa_key,), (self.fc_key,),
185
(self.fb_key,), (self.fd_key,)],
188
def test_annotate_common_merge_text(self):
189
self.make_common_merge_text()
190
self.assertAnnotateEqual([(self.fa_key,), (self.fb_key, self.fc_key)],
193
def test_annotate_many_way_common_merge_text(self):
194
self.make_many_way_common_merge_text()
195
self.assertAnnotateEqual([(self.fa_key,),
196
(self.fb_key, self.fc_key, self.fe_key)],
199
def test_annotate_merge_and_restored(self):
200
self.make_merge_and_restored_text()
201
self.assertAnnotateEqual([(self.fa_key,), (self.fa_key, self.fc_key)],
204
def test_annotate_flat_simple(self):
205
self.make_simple_text()
206
self.assertEqual([(self.fa_key, 'simple\n'),
207
(self.fa_key, 'content\n'),
208
], self.ann.annotate_flat(self.fa_key))
209
self.assertEqual([(self.fa_key, 'simple\n'),
210
(self.fb_key, 'new content\n'),
211
], self.ann.annotate_flat(self.fb_key))
213
def test_annotate_flat_merge_and_restored_text(self):
214
self.make_merge_and_restored_text()
215
# fc is a simple dominator of fa
216
self.assertEqual([(self.fa_key, 'simple\n'),
217
(self.fc_key, 'content\n'),
218
], self.ann.annotate_flat(self.fd_key))
220
def test_annotate_common_merge_text(self):
221
self.make_common_merge_text()
222
# there is no common point, so we just pick the lexicographical lowest
223
# and 'b-id' comes before 'c-id'
224
self.assertEqual([(self.fa_key, 'simple\n'),
225
(self.fb_key, 'new content\n'),
226
], self.ann.annotate_flat(self.fd_key))
228
def test_annotate_many_way_common_merge_text(self):
229
self.make_many_way_common_merge_text()
230
self.assertEqual([(self.fa_key, 'simple\n'),
231
(self.fb_key, 'new content\n')],
232
self.ann.annotate_flat(self.ff_key))
234
def test_annotate_flat_respects_break_ann_tie(self):
235
tiebreaker = annotate._break_annotation_tie
238
def custom_tiebreaker(annotated_lines):
239
self.assertEqual(2, len(annotated_lines))
240
left = annotated_lines[0]
241
self.assertEqual(2, len(left))
242
self.assertEqual('new content\n', left[1])
243
right = annotated_lines[1]
244
self.assertEqual(2, len(right))
245
self.assertEqual('new content\n', right[1])
246
calls.append((left[0], right[0]))
247
# Our custom tiebreaker takes the *largest* value, rather than
248
# the *smallest* value
249
if left[0] < right[0]:
253
annotate._break_annotation_tie = custom_tiebreaker
254
self.make_many_way_common_merge_text()
255
self.assertEqual([(self.fa_key, 'simple\n'),
256
(self.fe_key, 'new content\n')],
257
self.ann.annotate_flat(self.ff_key))
258
self.assertEqual([(self.fe_key, self.fc_key),
259
(self.fe_key, self.fb_key)], calls)
261
annotate._break_annotation_tie = tiebreaker
264
def test_needed_keys_simple(self):
265
self.make_simple_text()
266
keys, ann_keys = self.ann._get_needed_keys(self.fb_key)
267
self.assertEqual([self.fa_key, self.fb_key], sorted(keys))
268
self.assertEqual({self.fa_key: 1, self.fb_key: 1},
269
self.ann._num_needed_children)
270
self.assertEqual(set(), ann_keys)
272
def test_needed_keys_many(self):
273
self.make_many_way_common_merge_text()
274
keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
275
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
276
self.fd_key, self.fe_key, self.ff_key,
278
self.assertEqual({self.fa_key: 3,
284
}, self.ann._num_needed_children)
285
self.assertEqual(set(), ann_keys)
287
def test_needed_keys_with_special_text(self):
288
self.make_many_way_common_merge_text()
289
spec_key = ('f-id', revision.CURRENT_REVISION)
290
spec_text = 'simple\nnew content\nlocally modified\n'
291
self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
293
keys, ann_keys = self.ann._get_needed_keys(spec_key)
294
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
295
self.fd_key, self.fe_key,
297
self.assertEqual([spec_key], sorted(ann_keys))
299
def test_needed_keys_with_parent_texts(self):
300
self.make_many_way_common_merge_text()
301
# If 'D' and 'E' are already annotated, we don't need to extract all
303
# D | 'simple|new content|'
305
# | E 'simple|new content|'
307
# F-' 'simple|new content|'
308
self.ann._parent_map[self.fd_key] = (self.fb_key, self.fc_key)
309
self.ann._text_cache[self.fd_key] = ['simple\n', 'new content\n']
310
self.ann._annotations_cache[self.fd_key] = [
312
(self.fb_key, self.fc_key),
314
self.ann._parent_map[self.fe_key] = (self.fa_key,)
315
self.ann._text_cache[self.fe_key] = ['simple\n', 'new content\n']
316
self.ann._annotations_cache[self.fe_key] = [
320
keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
321
self.assertEqual([self.ff_key], sorted(keys))
322
self.assertEqual({self.fd_key: 1,
325
}, self.ann._num_needed_children)
326
self.assertEqual([], sorted(ann_keys))
328
def test_record_annotation_removes_texts(self):
329
self.make_many_way_common_merge_text()
330
# Populate the caches
331
for x in self.ann._get_needed_texts(self.ff_key):
333
self.assertEqual({self.fa_key: 3,
339
}, self.ann._num_needed_children)
340
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
341
self.fd_key, self.fe_key, self.ff_key,
342
], sorted(self.ann._text_cache.keys()))
343
self.ann._record_annotation(self.fa_key, [], [])
344
self.ann._record_annotation(self.fb_key, [self.fa_key], [])
345
self.assertEqual({self.fa_key: 2,
351
}, self.ann._num_needed_children)
352
self.assertTrue(self.fa_key in self.ann._text_cache)
353
self.assertTrue(self.fa_key in self.ann._annotations_cache)
354
self.ann._record_annotation(self.fc_key, [self.fa_key], [])
355
self.ann._record_annotation(self.fd_key, [self.fb_key, self.fc_key], [])
356
self.assertEqual({self.fa_key: 1,
362
}, self.ann._num_needed_children)
363
self.assertTrue(self.fa_key in self.ann._text_cache)
364
self.assertTrue(self.fa_key in self.ann._annotations_cache)
365
self.assertFalse(self.fb_key in self.ann._text_cache)
366
self.assertFalse(self.fb_key in self.ann._annotations_cache)
367
self.assertFalse(self.fc_key in self.ann._text_cache)
368
self.assertFalse(self.fc_key in self.ann._annotations_cache)
370
def test_annotate_special_text(self):
371
# Things like WT and PreviewTree want to annotate an arbitrary text
372
# ('current:') so we need a way to add that to the group of files to be
374
self.make_many_way_common_merge_text()
375
# A-. 'simple|content|'
377
# B | | 'simple|new content|'
379
# | C | 'simple|new content|'
381
# D | 'simple|new content|'
383
# | E 'simple|new content|'
385
# SPEC 'simple|new content|locally modified|'
386
spec_key = ('f-id', revision.CURRENT_REVISION)
387
spec_text = 'simple\nnew content\nlocally modified\n'
388
self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
390
self.assertAnnotateEqual([(self.fa_key,),
391
(self.fb_key, self.fc_key, self.fe_key),
396
def test_no_graph(self):
397
self.make_no_graph_texts()
398
self.assertAnnotateEqual([(self.fa_key,),
401
self.assertAnnotateEqual([(self.fb_key,),