3
Read in a changeset output, and process it into a Changeset object.
6
import bzrlib, bzrlib.changeset
9
class BadChangeset(Exception): pass
10
class MalformedHeader(BadChangeset): pass
11
class MalformedPatches(BadChangeset): pass
12
class MalformedFooter(BadChangeset): pass
15
"""Now we want to find the filename effected.
16
Unfortunately the filename is written out as
17
repr(filename), which means that it surrounds
18
the name with quotes which may be single or double
19
(single is preferred unless there is a single quote in
20
the filename). And some characters will be escaped.
22
TODO: There has to be some pythonic way of undo-ing the
23
representation of a string rather than using eval.
26
if name[-1] != delimiter:
27
raise BadChangeset('Could not properly parse the'
28
' filename: %r' % name)
29
# We need to handle escaped hexadecimals too.
30
return name[1:-1].replace('\"', '"').replace("\'", "'")
32
class ChangesetInfo(object):
33
"""This is the intermediate class that gets filled out as
42
self.revision_sha1 = None
44
self.precursor_sha1 = None
45
self.precursor_revno = None
50
self.tree_root_id = None
52
self.old_file_ids = None
54
self.actions = [] #this is the list of things that happened
55
self.id2path = {} # A mapping from file id to path name
56
self.path2id = {} # The reverse mapping
57
self.id2parent = {} # A mapping from a given id to it's parent id
61
self.old_id2parent = {}
65
return pprint.pformat(self.__dict__)
67
def create_maps(self):
68
"""Go through the individual id sections, and generate the
69
id2path and path2id maps.
71
# Rather than use an empty path, the changeset code seems
72
# to like to use "./." for the tree root.
73
self.id2path[self.tree_root_id] = './.'
74
self.path2id['./.'] = self.tree_root_id
75
self.id2parent[self.tree_root_id] = bzrlib.changeset.NULL_ID
76
self.old_id2path = self.id2path.copy()
77
self.old_path2id = self.path2id.copy()
78
self.old_id2parent = self.id2parent.copy()
81
for info in self.file_ids:
82
path, f_id, parent_id = info.split('\t')
83
self.id2path[f_id] = path
84
self.path2id[path] = f_id
85
self.id2parent[f_id] = parent_id
87
for info in self.old_file_ids:
88
path, f_id, parent_id = info.split('\t')
89
self.old_id2path[f_id] = path
90
self.old_path2id[path] = f_id
91
self.old_id2parent[f_id] = parent_id
93
def get_changeset(self):
94
"""Create a changeset from the data contained within."""
95
from bzrlib.changeset import Changeset, ChangesetEntry, \
96
PatchApply, ReplaceContents
99
entry = ChangesetEntry(self.tree_root_id,
100
bzrlib.changeset.NULL_ID, './.')
101
cset.add_entry(entry)
102
for info, lines in self.actions:
103
parts = info.split(' ')
106
extra = ' '.join(parts[2:])
107
if action == 'renamed':
108
old_path, new_path = extra.split(' => ')
109
old_path = _unescape(old_path)
110
new_path = _unescape(new_path)
112
new_id = self.path2id[new_path]
113
old_id = self.old_path2id[old_path]
114
assert old_id == new_id
116
new_parent = self.id2parent[new_id]
117
old_parent = self.old_id2parent[old_id]
119
entry = ChangesetEntry(old_id, old_parent, old_path)
120
entry.new_path = new_path
121
entry.new_parent = new_parent
123
entry.contents_change = PatchApply(''.join(lines))
124
elif action == 'removed':
125
old_path = _unescape(extra)
126
old_id = self.old_path2id[old_path]
127
old_parent = self.old_id2parent[old_id]
128
entry = ChangesetEntry(old_id, old_parent, old_path)
129
entry.new_path = None
130
entry.new_parent = None
132
# Technically a removed should be a ReplaceContents()
133
# Where you need to have the old contents
134
# But at most we have a remove style patch.
135
#entry.contents_change = ReplaceContents()
137
elif action == 'added':
138
new_path = _unescape(extra)
139
new_id = self.path2id[new_path]
140
new_parent = self.id2parent[new_id]
141
entry = ChangesetEntry(new_id, new_parent, new_path)
145
# Technically an added should be a ReplaceContents()
146
# Where you need to have the old contents
147
# But at most we have an add style patch.
148
#entry.contents_change = ReplaceContents()
149
entry.contents_change = PatchApply(''.join(lines))
150
elif action == 'modified':
151
new_path = _unescape(extra)
152
new_id = self.path2id[new_path]
153
new_parent = self.id2parent[new_id]
154
entry = ChangesetEntry(new_id, new_parent, new_path)
158
# Technically an added should be a ReplaceContents()
159
# Where you need to have the old contents
160
# But at most we have an add style patch.
161
#entry.contents_change = ReplaceContents()
162
entry.contents_change = PatchApply(''.join(lines))
164
raise BadChangeset('Unrecognized action: %r' % action)
165
cset.add_entry(entry)
168
class ChangesetReader(object):
169
"""This class reads in a changeset from a file, and returns
170
a Changeset object, which can then be applied against a tree.
172
def __init__(self, from_file):
173
"""Read in the changeset from the file.
175
:param from_file: A file-like object (must have iterator support).
177
object.__init__(self)
178
self.from_file = from_file
180
self.info = ChangesetInfo()
181
# We put the actual inventory ids in the footer, so that the patch
182
# is easier to read for humans.
183
# Unfortunately, that means we need to read everything before we
184
# can create a proper changeset.
186
next_line = self._read_patches()
187
if next_line is not None:
188
self._read_footer(next_line)
191
"""Create the actual changeset object.
193
self.info.create_maps()
196
def _read_header(self):
197
"""Read the bzr header"""
198
header = common.get_header()
199
for head_line, line in zip(header, self.from_file):
202
or line[2:-1] != head_line):
203
raise MalformedHeader('Did not read the opening'
204
' header information.')
206
for line in self.from_file:
207
if self._handle_info_line(line) is not None:
210
def _handle_info_line(self, line, in_footer=False):
211
"""Handle reading a single line.
213
This may call itself, in the case that we read_multi,
214
and then had a dangling line on the end.
216
# The bzr header is terminated with a blank line
217
# which does not start with #
222
raise MalformedHeader('Opening bzr header did not start with #')
224
line = line[2:-1] # Remove the '# '
226
return # Ignore blank lines
228
if in_footer and line in ('BEGIN BZR FOOTER', 'END BZR FOOTER'):
231
loc = line.find(': ')
236
value, next_line = self._read_many()
240
value, next_line = self._read_many()
242
raise MalformedHeader('While looking for key: value pairs,'
243
' did not find the colon %r' % (line))
245
key = key.replace(' ', '_')
246
if hasattr(self.info, key):
247
if getattr(self.info, key) is None:
248
setattr(self.info, key, value)
250
raise MalformedHeader('Duplicated Key: %s' % key)
252
# What do we do with a key we don't recognize
253
raise MalformedHeader('Unknown Key: %s' % key)
256
self._handle_info_line(next_line, in_footer=in_footer)
258
def _read_many(self):
259
"""If a line ends with no entry, that means that it should be
260
followed with multiple lines of values.
262
This detects the end of the list, because it will be a line that
263
does not start with '# '. Because it has to read that extra
264
line, it returns the tuple: (values, next_line)
267
for line in self.from_file:
270
values.append(line[5:-1])
273
def _read_one_patch(self, first_line=None):
274
"""Read in one patch, return the complete patch, along with
277
:return: action, lines, next_line, do_continue
282
def parse_firstline(line):
285
if line[:3] != '***':
286
raise MalformedPatches('The first line of all patches'
287
' should be a bzr meta line "***"')
290
if first_line is not None:
291
action = parse_firstline(first_line)
294
return None, [], first_line, False
297
for line in self.from_file:
299
action = parse_firstline(line)
302
return None, [], line, False
304
if line[:3] == '***':
305
return action, lines, line, True
306
elif line[:1] == '#':
307
return action, lines, line, False
309
return action, lines, None, False
311
def _read_patches(self):
315
action, lines, next_line, do_continue = \
316
self._read_one_patch(next_line)
317
if action is not None:
318
self.info.actions.append((action, lines))
321
def _read_footer(self, first_line=None):
322
"""Read the rest of the meta information.
324
:param first_line: The previous step iterates past what it
325
can handle. That extra line is given here.
327
if first_line is not None:
328
if self._handle_info_line(first_line, in_footer=True) is not None:
330
for line in self.from_file:
331
if self._handle_info_line(line, in_footer=True) is not None:
335
def read_changeset(from_file):
336
"""Read in a changeset from a filelike object (must have "readline" support), and
337
parse it into a Changeset object.
339
cr = ChangesetReader(from_file)