94
77
if len(revno_str) > max_revno_len:
95
78
revno_str = revno_str[:max_revno_len-1] + '>'
96
79
anno = "%-*s %-7s " % (max_revno_len, revno_str, author[:7])
97
if anno.lstrip() == "" and full:
101
except UnicodeEncodeError:
102
# cmd_annotate should be passing in an 'exact' object, which means
103
# we have a direct handle to sys.stdout or equivalent. It may not
104
# be able to handle the exact Unicode characters, but 'annotate' is
105
# a user function (non-scripting), so shouldn't die because of
106
# unrepresentable annotation characters. So encode using 'replace',
107
# and write them again.
108
to_file.write(anno.encode(encoding, 'replace'))
109
to_file.write('| %s\n' % (text,))
113
def _annotations(repo, file_id, rev_id):
114
"""Return the list of (origin_revision_id, line_text) for a revision of a file in a repository."""
115
annotations = repo.texts.annotate((file_id, rev_id))
117
return [(key[-1], line) for (key, line) in annotations]
81
if anno.lstrip() == "" and full: anno = prevanno
82
print >>to_file, '%s| %s' % (anno, text)
120
86
def _annotate_file(branch, rev_id, file_id):
121
87
"""Yield the origins for each line of a file.
123
This includes detailed information, such as the author name, and
89
This includes detailed information, such as the committer name, and
124
90
date string for the commit, rather than just the revision id.
126
92
revision_id_to_revno = branch.get_revision_id_to_revno_map()
127
annotations = _annotations(branch.repository, file_id, rev_id)
93
w = branch.repository.weave_store.get_weave(file_id,
94
branch.repository.get_transaction())
128
95
last_origin = None
96
annotations = list(w.annotate_iter(rev_id))
129
97
revision_ids = set(o for o, t in annotations)
130
98
revision_ids = [o for o in revision_ids if
131
99
branch.repository.has_revision(o)]
156
124
yield (revno_str, author, date_str, origin, text)
159
def reannotate(parents_lines, new_lines, new_revision_id,
160
_left_matching_blocks=None,
161
heads_provider=None):
127
def reannotate(parents_lines, new_lines, new_revision_id):
162
128
"""Create a new annotated version from new lines and parent annotations.
164
130
:param parents_lines: List of annotated lines for all parents
165
131
:param new_lines: The un-annotated new lines
166
132
:param new_revision_id: The revision-id to associate with new lines
167
133
(will often be CURRENT_REVISION)
168
:param left_matching_blocks: a hint about which areas are common
169
between the text and its left-hand-parent. The format is
170
the SequenceMatcher.get_matching_blocks format
171
(start_left, start_right, length_of_match).
172
:param heads_provider: An object which provids a .heads() call to resolve
173
if any revision ids are children of others.
174
If None, then any ancestry disputes will be resolved with
177
if len(parents_lines) == 0:
178
lines = [(new_revision_id, line) for line in new_lines]
179
elif len(parents_lines) == 1:
180
lines = _reannotate(parents_lines[0], new_lines, new_revision_id,
181
_left_matching_blocks)
182
elif len(parents_lines) == 2:
183
left = _reannotate(parents_lines[0], new_lines, new_revision_id,
184
_left_matching_blocks)
185
lines = _reannotate_annotated(parents_lines[1], new_lines,
186
new_revision_id, left,
135
if len(parents_lines) == 1:
136
for data in _reannotate(parents_lines[0], new_lines, new_revision_id):
189
reannotations = [_reannotate(parents_lines[0], new_lines,
190
new_revision_id, _left_matching_blocks)]
191
reannotations.extend(_reannotate(p, new_lines, new_revision_id)
192
for p in parents_lines[1:])
139
reannotations = [list(_reannotate(p, new_lines, new_revision_id)) for
194
141
for annos in zip(*reannotations):
195
142
origins = set(a for a, l in annos)
196
144
if len(origins) == 1:
197
# All the parents agree, so just return the first one
198
lines.append(annos[0])
145
yield iter(origins).next(), line
146
elif len(origins) == 2 and new_revision_id in origins:
147
yield (x for x in origins if x != new_revision_id).next(), line
201
if len(origins) == 2 and new_revision_id in origins:
202
origins.remove(new_revision_id)
203
if len(origins) == 1:
204
lines.append((origins.pop(), line))
206
lines.append((new_revision_id, line))
210
def _reannotate(parent_lines, new_lines, new_revision_id,
211
matching_blocks=None):
149
yield new_revision_id, line
152
def _reannotate(parent_lines, new_lines, new_revision_id):
153
plain_parent_lines = [l for r, l in parent_lines]
154
matcher = patiencediff.PatienceSequenceMatcher(None, plain_parent_lines,
213
if matching_blocks is None:
214
plain_parent_lines = [l for r, l in parent_lines]
215
matcher = patiencediff.PatienceSequenceMatcher(None,
216
plain_parent_lines, new_lines)
217
matching_blocks = matcher.get_matching_blocks()
219
for i, j, n in matching_blocks:
157
for i, j, n in matcher.get_matching_blocks():
220
158
for line in new_lines[new_cur:j]:
221
lines.append((new_revision_id, line))
222
lines.extend(parent_lines[i:i+n])
159
yield new_revision_id, line
160
for data in parent_lines[i:i+n]:
227
def _get_matching_blocks(old, new):
228
matcher = patiencediff.PatienceSequenceMatcher(None,
230
return matcher.get_matching_blocks()
233
def _find_matching_unannotated_lines(output_lines, plain_child_lines,
234
child_lines, start_child, end_child,
235
right_lines, start_right, end_right,
236
heads_provider, revision_id):
237
"""Find lines in plain_right_lines that match the existing lines.
239
:param output_lines: Append final annotated lines to this list
240
:param plain_child_lines: The unannotated new lines for the child text
241
:param child_lines: Lines for the child text which have been annotated
243
:param start_child: Position in plain_child_lines and child_lines to start the
245
:param end_child: Last position in plain_child_lines and child_lines to search
247
:param right_lines: The annotated lines for the whole text for the right
249
:param start_right: Position in right_lines to start the match
250
:param end_right: Last position in right_lines to search for a match
251
:param heads_provider: When parents disagree on the lineage of a line, we
252
need to check if one side supersedes the other
253
:param revision_id: The label to give if a line should be labeled 'tip'
255
output_extend = output_lines.extend
256
output_append = output_lines.append
257
# We need to see if any of the unannotated lines match
258
plain_right_subset = [l for a,l in right_lines[start_right:end_right]]
259
plain_child_subset = plain_child_lines[start_child:end_child]
260
match_blocks = _get_matching_blocks(plain_right_subset, plain_child_subset)
264
for right_idx, child_idx, match_len in match_blocks:
265
# All the lines that don't match are just passed along
266
if child_idx > last_child_idx:
267
output_extend(child_lines[start_child + last_child_idx
268
:start_child + child_idx])
269
for offset in xrange(match_len):
270
left = child_lines[start_child+child_idx+offset]
271
right = right_lines[start_right+right_idx+offset]
272
if left[0] == right[0]:
273
# The annotations match, just return the left one
275
elif left[0] == revision_id:
276
# The left parent marked this as unmatched, so let the
277
# right parent claim it
280
# Left and Right both claim this line
281
if heads_provider is None:
282
output_append((revision_id, left[1]))
284
heads = heads_provider.heads((left[0], right[0]))
286
output_append((iter(heads).next(), left[1]))
288
# Both claim different origins, sort lexicographically
289
# so that we always get a stable result.
290
output_append(sorted([left, right])[0])
291
last_child_idx = child_idx + match_len
294
def _reannotate_annotated(right_parent_lines, new_lines, new_revision_id,
295
annotated_lines, heads_provider):
296
"""Update the annotations for a node based on another parent.
298
:param right_parent_lines: A list of annotated lines for the right-hand
300
:param new_lines: The unannotated new lines.
301
:param new_revision_id: The revision_id to attribute to lines which are not
302
present in either parent.
303
:param annotated_lines: A list of annotated lines. This should be the
304
annotation of new_lines based on parents seen so far.
305
:param heads_provider: When parents disagree on the lineage of a line, we
306
need to check if one side supersedes the other.
308
if len(new_lines) != len(annotated_lines):
309
raise AssertionError("mismatched new_lines and annotated_lines")
310
# First compare the newly annotated lines with the right annotated lines.
311
# Lines which were not changed in left or right should match. This tends to
312
# be the bulk of the lines, and they will need no further processing.
314
lines_extend = lines.extend
315
last_right_idx = 0 # The line just after the last match from the right side
317
matching_left_and_right = _get_matching_blocks(right_parent_lines,
319
for right_idx, left_idx, match_len in matching_left_and_right:
320
# annotated lines from last_left_idx to left_idx did not match the lines from
322
# to right_idx, the raw lines should be compared to determine what annotations
324
if last_right_idx == right_idx or last_left_idx == left_idx:
325
# One of the sides is empty, so this is a pure insertion
326
lines_extend(annotated_lines[last_left_idx:left_idx])
328
# We need to see if any of the unannotated lines match
329
_find_matching_unannotated_lines(lines,
330
new_lines, annotated_lines,
331
last_left_idx, left_idx,
333
last_right_idx, right_idx,
336
last_right_idx = right_idx + match_len
337
last_left_idx = left_idx + match_len
338
# If left and right agree on a range, just push that into the output
339
lines_extend(annotated_lines[left_idx:left_idx + match_len])