58
lines = iter(line_list)
62
if (not label_line.startswith('label: ') or
63
not sha1_line.startswith('sha1: ')):
64
raise AssertionError("bad text record %r" % lines)
65
label = tuple(label_line[7:-1].split('\x00'))
66
sha1 = sha1_line[6:-1]
70
numbers = [int(n) for n in header[2:].split(',')]
72
result.append((op, numbers[0], numbers[1], None))
74
contents = [next() for i in xrange(numbers[0])]
75
result.append((op, None, numbers[0], contents))
77
return label, sha1, result
80
def apply_delta(basis, delta):
81
"""Apply delta to this object to become new_version_id."""
84
# eq ranges occur where gaps occur
85
# start, end refer to offsets in basis
86
for op, start, count, delta_lines in delta:
88
lines.append(basis[start:start+count])
90
lines.extend(delta_lines)
91
trim_encoding_newline(lines)
95
def trim_encoding_newline(lines):
99
lines[-1] = lines[-1][:-1]
60
action_byte = bytes[0]
61
action = {'f':'fulltext', 'd':'delta'}[action_byte]
62
return action, None, None, bytes[1:]
63
(action, label_line, sha1_line, len_line,
64
delta_bytes) = bytes.split('\n', 4)
65
if (action not in ('fulltext', 'delta')
66
or not label_line.startswith('label:')
67
or not sha1_line.startswith('sha1:')
68
or not len_line.startswith('len:')
70
raise AssertionError("bad text record %r" % (bytes,))
71
label = tuple(label_line[6:].split('\x00'))
73
length = int(len_line[4:])
74
if not len(delta_bytes) == length:
75
raise AssertionError("bad length record %r" % (bytes,))
76
return action, label, sha1, delta_bytes
103
79
def sort_gc_optimal(parent_map):
147
_equivalence_table_class = equivalence_table.EquivalenceTable
149
123
def __init__(self, delta=True):
150
124
"""Create a GroupCompressor.
152
126
:param delta: If False, do not compress records.
155
self.line_offsets = []
128
# Consider seeding the lines with some sort of GC Start flag, or
129
# putting it as part of the output stream, rather than in the
156
132
self.endpoint = 0
157
133
self.input_bytes = 0
158
self.line_locations = self._equivalence_table_class([])
159
self.lines = self.line_locations.lines
160
134
self.labels_deltas = {}
161
self._present_prefixes = set()
163
def get_matching_blocks(self, lines, soft=False):
164
"""Return the ranges in lines which match self.lines.
166
:param lines: lines to compress
167
:return: A list of (old_start, new_start, length) tuples which reflect
168
a region in self.lines that is present in lines. The last element
169
of the list is always (old_len, new_len, 0) to provide a end point
170
for generating instructions from the matching blocks list.
174
line_locations = self.line_locations
175
line_locations.set_right_lines(lines)
176
# We either copy a range (while there are reusable lines) or we
177
# insert new lines. To find reusable lines we traverse
180
result_append = result.append
183
min_match_bytes = 200
185
block, pos, locations = _get_longest_match(line_locations, pos,
187
if block is not None:
188
# Check to see if we are matching fewer than 5 characters,
189
# which is turned into a simple 'insert', rather than a copy
190
# If we have more than 5 lines, we definitely have more than 5
192
if block[-1] < min_match_bytes:
193
# This block may be a 'short' block, check
194
old_start, new_start, range_len = block
195
matched_bytes = sum(map(len,
196
lines[new_start:new_start + range_len]))
197
if matched_bytes < min_match_bytes:
199
if block is not None:
201
result_append((len(self.lines), len(lines), 0))
204
def compress(self, key, lines, expected_sha, soft=False):
135
self._delta_index = _groupcompress_pyx.DeltaIndex()
137
def compress(self, key, bytes, expected_sha, soft=False):
205
138
"""Compress lines with label key.
207
140
:param key: A key tuple. It is stored in the output
208
141
for identification of the text during decompression. If the last
209
142
element is 'None' it is replaced with the sha1 of the text -
210
143
e.g. sha1:xxxxxxx.
211
:param lines: The lines to be compressed. Must be split
212
on \n, with the \n preserved.'
144
:param bytes: The bytes to be compressed
213
145
:param expected_sha: If non-None, the sha the lines are believed to
214
146
have. During compression the sha is calculated; a mismatch will
218
150
:return: The sha1 of lines, and the number of bytes accumulated in
219
151
the group output so far.
221
sha1 = sha_strings(lines)
153
if not _FAST or expected_sha is None:
154
sha1 = sha_string(bytes)
222
157
if key[-1] is None:
223
158
key = key[:-1] + ('sha1:' + sha1,)
224
159
label = '\x00'.join(key)
226
new_lines = ['label: %s\n' % label,
230
index_lines = [False, False]
231
# setup good encoding for trailing \n support.
232
if not lines or lines[-1].endswith('\n'):
235
lines[-1] = lines[-1] + '\n'
239
flush_range = self.flush_range
241
blocks = self.get_matching_blocks(lines, soft=soft)
243
#copies_without_insertion = []
244
# We either copy a range (while there are reusable lines) or we
245
# insert new lines. To find reusable lines we traverse
246
for old_start, new_start, range_len in blocks:
247
if new_start != current_pos:
248
# if copies_without_insertion:
249
# self.flush_multi(copies_without_insertion,
250
# lines, new_lines, index_lines)
251
# copies_without_insertion = []
252
# non-matching region
253
flush_range(current_pos, None, new_start - current_pos,
254
lines, new_lines, index_lines)
255
current_pos = new_start + range_len
258
# copies_without_insertion.append((new_start, old_start, range_len))
259
flush_range(new_start, old_start, range_len,
260
lines, new_lines, index_lines)
261
# if copies_without_insertion:
262
# self.flush_multi(copies_without_insertion,
263
# lines, new_lines, index_lines)
264
# copies_without_insertion = []
160
input_len = len(bytes)
161
# By having action/label/sha1/len, we can parse the group if the index
162
# was ever destroyed, we have the key in 'label', we know the final
163
# bytes are valid from sha1, and we know where to find the end of this
164
# record because of 'len'. (the delta record itself will store the
165
# total length for the expanded record)
166
# 'len: %d\n' costs approximately 1% increase in total data
167
# Having the labels at all costs us 9-10% increase, 38% increase for
168
# inventory pages, and 5.8% increase for text pages
172
new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
173
if self._delta_index._source_offset != self.endpoint:
174
raise AssertionError('_source_offset != endpoint'
175
' somehow the DeltaIndex got out of sync with'
177
max_delta_size = len(bytes) / 2
178
delta = self._delta_index.make_delta(bytes, max_delta_size)
180
# We can't delta (perhaps source_text is empty)
181
# so mark this as an insert
185
new_chunks.insert(0, 'fulltext\n')
186
new_chunks.append('len:%s\n' % (input_len,))
187
unadded_bytes = sum(map(len, new_chunks))
188
self._delta_index.add_source(bytes, unadded_bytes)
189
new_chunks.append(bytes)
194
new_chunks.insert(0, 'delta\n')
195
new_chunks.append('len:%s\n' % (len(delta),))
197
new_chunks.append(delta)
198
unadded_bytes = sum(map(len, new_chunks))
199
self._delta_index._source_offset += unadded_bytes
201
unadded_bytes = sum(map(len, new_chunks))
202
self._delta_index.add_delta_source(delta, unadded_bytes)
203
new_chunks.append(delta)
265
204
delta_start = (self.endpoint, len(self.lines))
266
self.output_lines(new_lines, index_lines)
267
trim_encoding_newline(lines)
268
self.input_bytes += sum(map(len, lines))
205
self.output_chunks(new_chunks)
206
self.input_bytes += input_len
269
207
delta_end = (self.endpoint, len(self.lines))
270
208
self.labels_deltas[key] = (delta_start, delta_end)
209
if not self._delta_index._source_offset == self.endpoint:
210
raise AssertionError('the delta index is out of sync'
211
'with the output lines %s != %s'
212
% (self._delta_index._source_offset, self.endpoint))
271
213
return sha1, self.endpoint
273
215
def extract(self, key):
274
216
"""Extract a key previously added to the compressor.
276
218
:param key: The key to extract.
277
219
:return: An iterable over bytes and the sha1.
279
221
delta_details = self.labels_deltas[key]
280
delta_lines = self.lines[delta_details[0][1]:delta_details[1][1]]
281
label, sha1, delta = parse(delta_lines)
282
## delta = parse(delta_lines)
222
delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
223
action, label, sha1, delta = parse(''.join(delta_chunks))
224
if not _NO_LABELS and label != key:
284
225
raise AssertionError("wrong key: %r, wanted %r" % (label, key))
285
# Perhaps we want to keep the line offsets too in memory at least?
286
chunks = apply_delta(''.join(self.lines), delta)
287
sha1 = sha_strings(chunks)
290
def flush_multi(self, instructions, lines, new_lines, index_lines):
291
"""Flush a bunch of different ranges out.
293
This should only be called with data that are "pure" copies.
295
flush_range = self.flush_range
296
if len(instructions) > 2:
297
# This is the number of lines to be copied
298
total_copy_range = sum(i[2] for i in instructions)
299
if len(instructions) > 0.5 * total_copy_range:
300
# We are copying N lines, but taking more than N/2
301
# copy instructions to do so. We will go ahead and expand this
302
# text so that other code is able to match against it
303
flush_range(instructions[0][0], None, total_copy_range,
304
lines, new_lines, index_lines)
306
for ns, os, rl in instructions:
307
flush_range(ns, os, rl, lines, new_lines, index_lines)
309
def flush_range(self, range_start, copy_start, range_len, lines, new_lines, index_lines):
310
insert_instruction = "i,%d\n" % range_len
311
if copy_start is not None:
312
# range stops, flush and start a new copy range
313
stop_byte = self.line_offsets[copy_start + range_len - 1]
317
start_byte = self.line_offsets[copy_start - 1]
318
bytes = stop_byte - start_byte
319
copy_control_instruction = "c,%d,%d\n" % (start_byte, bytes)
320
if (bytes + len(insert_instruction) >
321
len(copy_control_instruction)):
322
new_lines.append(copy_control_instruction)
323
index_lines.append(False)
325
# not copying, or inserting is shorter than copying, so insert.
326
new_lines.append(insert_instruction)
327
new_lines.extend(lines[range_start:range_start+range_len])
328
index_lines.append(False)
329
index_lines.extend([copy_start is None]*range_len)
331
def output_lines(self, new_lines, index_lines):
332
"""Output some lines.
334
:param new_lines: The lines to output.
335
:param index_lines: A boolean flag for each line - when True, index
338
# indexed_newlines = [idx for idx, val in enumerate(index_lines)
339
# if val and new_lines[idx] == '\n']
340
# if indexed_newlines:
341
# import pdb; pdb.set_trace()
226
if action == 'fulltext':
229
source = ''.join(self.lines[delta_details[0][0]])
230
bytes = _groupcompress_pyx.apply_delta(source, delta)
232
sha1 = sha_string(bytes)
234
assert sha1 == sha_string(bytes)
237
def output_chunks(self, new_chunks):
238
"""Output some chunks.
240
:param new_chunks: The chunks to output.
342
242
endpoint = self.endpoint
343
self.line_locations.extend_lines(new_lines, index_lines)
344
for line in new_lines:
345
endpoint += len(line)
346
self.line_offsets.append(endpoint)
243
self.lines.extend(new_chunks)
244
endpoint += sum(map(len, new_chunks))
347
245
self.endpoint = endpoint