~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to groupcompress.py

Bring in the 'rabin' experiment.
Change the names and disk-strings for the various repository formats.
Make the CHK format repositories all 'rich-root' we can introduce non-rich-root later.
Make a couple other small tweaks, like copyright statements, etc.
Remove patch-delta.c, at this point, it was only a reference implementation,
as we have fully integrated the patching into pyrex, to allow nicer exception
handling.

Show diffs side-by-side

added added

removed removed

Lines of Context:
52
52
    VersionedFiles,
53
53
    )
54
54
 
55
 
 
56
 
def parse(line_list):
57
 
    result = []
58
 
    lines = iter(line_list)
59
 
    next = lines.next
60
 
    label_line = next()
61
 
    sha1_line = next()
62
 
    if (not label_line.startswith('label: ') or
63
 
        not sha1_line.startswith('sha1: ')):
64
 
        raise AssertionError("bad text record %r" % lines)
65
 
    label = tuple(label_line[7:-1].split('\x00'))
66
 
    sha1 = sha1_line[6:-1]
67
 
    for header in lines:
68
 
        op = header[0]
69
 
        numbers = header[2:]
70
 
        numbers = [int(n) for n in header[2:].split(',')]
71
 
        if op == 'c':
72
 
            result.append((op, numbers[0], numbers[1], None))
73
 
        else:
74
 
            contents = [next() for i in xrange(numbers[0])]
75
 
            result.append((op, None, numbers[0], contents))
76
 
    ## return result
77
 
    return label, sha1, result
78
 
 
79
 
 
80
 
def apply_delta(basis, delta):
81
 
    """Apply delta to this object to become new_version_id."""
82
 
    lines = []
83
 
    last_offset = 0
84
 
    # eq ranges occur where gaps occur
85
 
    # start, end refer to offsets in basis
86
 
    for op, start, count, delta_lines in delta:
87
 
        if op == 'c':
88
 
            lines.append(basis[start:start+count])
89
 
        else:
90
 
            lines.extend(delta_lines)
91
 
    trim_encoding_newline(lines)
92
 
    return lines
93
 
 
94
 
 
95
 
def trim_encoding_newline(lines):
96
 
    if lines[-1] == '\n':
97
 
        del lines[-1]
98
 
    else:
99
 
        lines[-1] = lines[-1][:-1]
100
 
 
 
55
_NO_LABELS = False
 
56
_FAST = False
 
57
 
 
58
def parse(bytes):
 
59
    if _NO_LABELS:
 
60
        action_byte = bytes[0]
 
61
        action = {'f':'fulltext', 'd':'delta'}[action_byte]
 
62
        return action, None, None, bytes[1:]
 
63
    (action, label_line, sha1_line, len_line,
 
64
     delta_bytes) = bytes.split('\n', 4)
 
65
    if (action not in ('fulltext', 'delta')
 
66
        or not label_line.startswith('label:')
 
67
        or not sha1_line.startswith('sha1:')
 
68
        or not len_line.startswith('len:')
 
69
        ):
 
70
        raise AssertionError("bad text record %r" % (bytes,))
 
71
    label = tuple(label_line[6:].split('\x00'))
 
72
    sha1 = sha1_line[5:]
 
73
    length = int(len_line[4:])
 
74
    if not len(delta_bytes) == length:
 
75
        raise AssertionError("bad length record %r" % (bytes,))
 
76
    return action, label, sha1, delta_bytes
101
77
 
102
78
 
103
79
def sort_gc_optimal(parent_map):
130
106
 
131
107
class GroupCompressor(object):
132
108
    """Produce a serialised group of compressed texts.
133
 
    
 
109
 
134
110
    It contains code very similar to SequenceMatcher because of having a similar
135
111
    task. However some key differences apply:
136
112
     - there is no junk, we want a minimal edit not a human readable diff.
144
120
       left side.
145
121
    """
146
122
 
147
 
    _equivalence_table_class = equivalence_table.EquivalenceTable
148
 
 
149
123
    def __init__(self, delta=True):
150
124
        """Create a GroupCompressor.
151
125
 
152
126
        :param delta: If False, do not compress records.
153
127
        """
154
 
        self._delta = delta
155
 
        self.line_offsets = []
 
128
        # Consider seeding the lines with some sort of GC Start flag, or
 
129
        # putting it as part of the output stream, rather than in the
 
130
        # compressed bytes.
 
131
        self.lines = []
156
132
        self.endpoint = 0
157
133
        self.input_bytes = 0
158
 
        self.line_locations = self._equivalence_table_class([])
159
 
        self.lines = self.line_locations.lines
160
134
        self.labels_deltas = {}
161
 
        self._present_prefixes = set()
162
 
 
163
 
    def get_matching_blocks(self, lines, soft=False):
164
 
        """Return the ranges in lines which match self.lines.
165
 
 
166
 
        :param lines: lines to compress
167
 
        :return: A list of (old_start, new_start, length) tuples which reflect
168
 
            a region in self.lines that is present in lines.  The last element
169
 
            of the list is always (old_len, new_len, 0) to provide a end point
170
 
            for generating instructions from the matching blocks list.
171
 
        """
172
 
        result = []
173
 
        pos = 0
174
 
        line_locations = self.line_locations
175
 
        line_locations.set_right_lines(lines)
176
 
        # We either copy a range (while there are reusable lines) or we 
177
 
        # insert new lines. To find reusable lines we traverse 
178
 
        locations = None
179
 
        max_pos = len(lines)
180
 
        result_append = result.append
181
 
        min_match_bytes = 10
182
 
        if soft:
183
 
            min_match_bytes = 200
184
 
        while pos < max_pos:
185
 
            block, pos, locations = _get_longest_match(line_locations, pos,
186
 
                                                       max_pos, locations)
187
 
            if block is not None:
188
 
                # Check to see if we are matching fewer than 5 characters,
189
 
                # which is turned into a simple 'insert', rather than a copy
190
 
                # If we have more than 5 lines, we definitely have more than 5
191
 
                # chars
192
 
                if block[-1] < min_match_bytes:
193
 
                    # This block may be a 'short' block, check
194
 
                    old_start, new_start, range_len = block
195
 
                    matched_bytes = sum(map(len,
196
 
                        lines[new_start:new_start + range_len]))
197
 
                    if matched_bytes < min_match_bytes:
198
 
                        block = None
199
 
            if block is not None:
200
 
                result_append(block)
201
 
        result_append((len(self.lines), len(lines), 0))
202
 
        return result
203
 
 
204
 
    def compress(self, key, lines, expected_sha, soft=False):
 
135
        self._delta_index = _groupcompress_pyx.DeltaIndex()
 
136
 
 
137
    def compress(self, key, bytes, expected_sha, soft=False):
205
138
        """Compress lines with label key.
206
139
 
207
140
        :param key: A key tuple. It is stored in the output
208
141
            for identification of the text during decompression. If the last
209
142
            element is 'None' it is replaced with the sha1 of the text -
210
143
            e.g. sha1:xxxxxxx.
211
 
        :param lines: The lines to be compressed. Must be split
212
 
            on \n, with the \n preserved.'
 
144
        :param bytes: The bytes to be compressed
213
145
        :param expected_sha: If non-None, the sha the lines are believed to
214
146
            have. During compression the sha is calculated; a mismatch will
215
147
            cause an error.
218
150
        :return: The sha1 of lines, and the number of bytes accumulated in
219
151
            the group output so far.
220
152
        """
221
 
        sha1 = sha_strings(lines)
 
153
        if not _FAST or expected_sha is None:
 
154
            sha1 = sha_string(bytes)
 
155
        else:
 
156
            sha1 = expected_sha
222
157
        if key[-1] is None:
223
158
            key = key[:-1] + ('sha1:' + sha1,)
224
159
        label = '\x00'.join(key)
225
 
        ## new_lines = []
226
 
        new_lines = ['label: %s\n' % label,
227
 
                     'sha1: %s\n' % sha1,
228
 
                    ]
229
 
        ## index_lines = []
230
 
        index_lines = [False, False]
231
 
        # setup good encoding for trailing \n support.
232
 
        if not lines or lines[-1].endswith('\n'):
233
 
            lines.append('\n')
234
 
        else:
235
 
            lines[-1] = lines[-1] + '\n'
236
 
        pos = 0
237
 
        range_len = 0
238
 
        range_start = 0
239
 
        flush_range = self.flush_range
240
 
        copy_ends = None
241
 
        blocks = self.get_matching_blocks(lines, soft=soft)
242
 
        current_pos = 0
243
 
        #copies_without_insertion = []
244
 
        # We either copy a range (while there are reusable lines) or we
245
 
        # insert new lines. To find reusable lines we traverse
246
 
        for old_start, new_start, range_len in blocks:
247
 
            if new_start != current_pos:
248
 
                # if copies_without_insertion:
249
 
                #     self.flush_multi(copies_without_insertion,
250
 
                #                      lines, new_lines, index_lines)
251
 
                #     copies_without_insertion = []
252
 
                # non-matching region
253
 
                flush_range(current_pos, None, new_start - current_pos,
254
 
                    lines, new_lines, index_lines)
255
 
            current_pos = new_start + range_len
256
 
            if not range_len:
257
 
                continue
258
 
            # copies_without_insertion.append((new_start, old_start, range_len))
259
 
            flush_range(new_start, old_start, range_len,
260
 
                        lines, new_lines, index_lines)
261
 
        # if copies_without_insertion:
262
 
        #     self.flush_multi(copies_without_insertion,
263
 
        #                      lines, new_lines, index_lines)
264
 
        #     copies_without_insertion = []
 
160
        input_len = len(bytes)
 
161
        # By having action/label/sha1/len, we can parse the group if the index
 
162
        # was ever destroyed, we have the key in 'label', we know the final
 
163
        # bytes are valid from sha1, and we know where to find the end of this
 
164
        # record because of 'len'. (the delta record itself will store the
 
165
        # total length for the expanded record)
 
166
        # 'len: %d\n' costs approximately 1% increase in total data
 
167
        # Having the labels at all costs us 9-10% increase, 38% increase for
 
168
        # inventory pages, and 5.8% increase for text pages
 
169
        if _NO_LABELS:
 
170
            new_chunks = []
 
171
        else:
 
172
            new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
 
173
        if self._delta_index._source_offset != self.endpoint:
 
174
            raise AssertionError('_source_offset != endpoint'
 
175
                ' somehow the DeltaIndex got out of sync with'
 
176
                ' the output lines')
 
177
        max_delta_size = len(bytes) / 2
 
178
        delta = self._delta_index.make_delta(bytes, max_delta_size)
 
179
        if (delta is None):
 
180
            # We can't delta (perhaps source_text is empty)
 
181
            # so mark this as an insert
 
182
            if _NO_LABELS:
 
183
                new_chunks = ['f']
 
184
            else:
 
185
                new_chunks.insert(0, 'fulltext\n')
 
186
                new_chunks.append('len:%s\n' % (input_len,))
 
187
            unadded_bytes = sum(map(len, new_chunks))
 
188
            self._delta_index.add_source(bytes, unadded_bytes)
 
189
            new_chunks.append(bytes)
 
190
        else:
 
191
            if _NO_LABELS:
 
192
                new_chunks = ['d']
 
193
            else:
 
194
                new_chunks.insert(0, 'delta\n')
 
195
                new_chunks.append('len:%s\n' % (len(delta),))
 
196
            if _FAST:
 
197
                new_chunks.append(delta)
 
198
                unadded_bytes = sum(map(len, new_chunks))
 
199
                self._delta_index._source_offset += unadded_bytes
 
200
            else:
 
201
                unadded_bytes = sum(map(len, new_chunks))
 
202
                self._delta_index.add_delta_source(delta, unadded_bytes)
 
203
                new_chunks.append(delta)
265
204
        delta_start = (self.endpoint, len(self.lines))
266
 
        self.output_lines(new_lines, index_lines)
267
 
        trim_encoding_newline(lines)
268
 
        self.input_bytes += sum(map(len, lines))
 
205
        self.output_chunks(new_chunks)
 
206
        self.input_bytes += input_len
269
207
        delta_end = (self.endpoint, len(self.lines))
270
208
        self.labels_deltas[key] = (delta_start, delta_end)
 
209
        if not self._delta_index._source_offset == self.endpoint:
 
210
            raise AssertionError('the delta index is out of sync'
 
211
                'with the output lines %s != %s'
 
212
                % (self._delta_index._source_offset, self.endpoint))
271
213
        return sha1, self.endpoint
272
214
 
273
215
    def extract(self, key):
274
216
        """Extract a key previously added to the compressor.
275
 
        
 
217
 
276
218
        :param key: The key to extract.
277
219
        :return: An iterable over bytes and the sha1.
278
220
        """
279
221
        delta_details = self.labels_deltas[key]
280
 
        delta_lines = self.lines[delta_details[0][1]:delta_details[1][1]]
281
 
        label, sha1, delta = parse(delta_lines)
282
 
        ## delta = parse(delta_lines)
283
 
        if label != key:
 
222
        delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
 
223
        action, label, sha1, delta = parse(''.join(delta_chunks))
 
224
        if not _NO_LABELS and label != key:
284
225
            raise AssertionError("wrong key: %r, wanted %r" % (label, key))
285
 
        # Perhaps we want to keep the line offsets too in memory at least?
286
 
        chunks = apply_delta(''.join(self.lines), delta)
287
 
        sha1 = sha_strings(chunks)
288
 
        return chunks, sha1
289
 
 
290
 
    def flush_multi(self, instructions, lines, new_lines, index_lines):
291
 
        """Flush a bunch of different ranges out.
292
 
 
293
 
        This should only be called with data that are "pure" copies.
294
 
        """
295
 
        flush_range = self.flush_range
296
 
        if len(instructions) > 2:
297
 
            # This is the number of lines to be copied
298
 
            total_copy_range = sum(i[2] for i in instructions)
299
 
            if len(instructions) > 0.5 * total_copy_range:
300
 
                # We are copying N lines, but taking more than N/2
301
 
                # copy instructions to do so. We will go ahead and expand this
302
 
                # text so that other code is able to match against it
303
 
                flush_range(instructions[0][0], None, total_copy_range,
304
 
                            lines, new_lines, index_lines)
305
 
                return
306
 
        for ns, os, rl in instructions:
307
 
            flush_range(ns, os, rl, lines, new_lines, index_lines)
308
 
 
309
 
    def flush_range(self, range_start, copy_start, range_len, lines, new_lines, index_lines):
310
 
        insert_instruction = "i,%d\n" % range_len
311
 
        if copy_start is not None:
312
 
            # range stops, flush and start a new copy range
313
 
            stop_byte = self.line_offsets[copy_start + range_len - 1]
314
 
            if copy_start == 0:
315
 
                start_byte = 0
316
 
            else:
317
 
                start_byte = self.line_offsets[copy_start - 1]
318
 
            bytes = stop_byte - start_byte
319
 
            copy_control_instruction = "c,%d,%d\n" % (start_byte, bytes)
320
 
            if (bytes + len(insert_instruction) >
321
 
                len(copy_control_instruction)):
322
 
                new_lines.append(copy_control_instruction)
323
 
                index_lines.append(False)
324
 
                return
325
 
        # not copying, or inserting is shorter than copying, so insert.
326
 
        new_lines.append(insert_instruction)
327
 
        new_lines.extend(lines[range_start:range_start+range_len])
328
 
        index_lines.append(False)
329
 
        index_lines.extend([copy_start is None]*range_len)
330
 
 
331
 
    def output_lines(self, new_lines, index_lines):
332
 
        """Output some lines.
333
 
 
334
 
        :param new_lines: The lines to output.
335
 
        :param index_lines: A boolean flag for each line - when True, index
336
 
            that line.
337
 
        """
338
 
        # indexed_newlines = [idx for idx, val in enumerate(index_lines)
339
 
        #                          if val and new_lines[idx] == '\n']
340
 
        # if indexed_newlines:
341
 
        #     import pdb; pdb.set_trace()
 
226
        if action == 'fulltext':
 
227
            bytes = delta
 
228
        else:
 
229
            source = ''.join(self.lines[delta_details[0][0]])
 
230
            bytes = _groupcompress_pyx.apply_delta(source, delta)
 
231
        if _NO_LABELS:
 
232
            sha1 = sha_string(bytes)
 
233
        else:
 
234
            assert sha1 == sha_string(bytes)
 
235
        return [bytes], sha1
 
236
 
 
237
    def output_chunks(self, new_chunks):
 
238
        """Output some chunks.
 
239
 
 
240
        :param new_chunks: The chunks to output.
 
241
        """
342
242
        endpoint = self.endpoint
343
 
        self.line_locations.extend_lines(new_lines, index_lines)
344
 
        for line in new_lines:
345
 
            endpoint += len(line)
346
 
            self.line_offsets.append(endpoint)
 
243
        self.lines.extend(new_chunks)
 
244
        endpoint += sum(map(len, new_chunks))
347
245
        self.endpoint = endpoint
348
246
 
349
247
    def ratio(self):
527
425
                    result[key] = self._unadded_refs[key]
528
426
        return result
529
427
 
530
 
    def _get_delta_lines(self, key):
531
 
        """Helper function for manual debugging.
532
 
 
533
 
        This is a convenience function that shouldn't be used in production
534
 
        code.
535
 
        """
536
 
        build_details = self._index.get_build_details([key])[key]
537
 
        index_memo = build_details[0]
538
 
        group, delta_lines = self._get_group_and_delta_lines(index_memo)
539
 
        return delta_lines
540
 
 
541
 
    def _get_group_and_delta_lines(self, index_memo):
 
428
    def _get_group_and_delta_bytes(self, index_memo):
542
429
        read_memo = index_memo[0:3]
543
430
        # get the group:
544
431
        try:
557
444
        # parse - requires split_lines, better to have byte offsets
558
445
        # here (but not by much - we only split the region for the
559
446
        # recipe, and we often want to end up with lines anyway.
560
 
        return plain, split_lines(plain[index_memo[3]:index_memo[4]])
 
447
        return plain, plain[index_memo[3]:index_memo[4]]
561
448
 
562
449
    def get_missing_compression_parent_keys(self):
563
450
        """Return the keys of missing compression parents.
636
523
                parents = self._unadded_refs[key]
637
524
            else:
638
525
                index_memo, _, parents, (method, _) = locations[key]
639
 
                plain, delta_lines = self._get_group_and_delta_lines(index_memo)
640
 
                label, sha1, delta = parse(delta_lines)
641
 
                if label != key:
 
526
                plain, delta_bytes = self._get_group_and_delta_bytes(index_memo)
 
527
                action, label, sha1, delta = parse(delta_bytes)
 
528
                if not _NO_LABELS and label != key:
642
529
                    raise AssertionError("wrong key: %r, wanted %r" % (label, key))
643
 
                chunks = apply_delta(plain, delta)
644
 
                if sha_strings(chunks) != sha1:
645
 
                    raise AssertionError('sha1 sum did not match')
 
530
                if action == 'fulltext':
 
531
                    chunks = [delta]
 
532
                else:
 
533
                    # TODO: relax apply_delta so that it can allow source to be
 
534
                    #       longer than expected
 
535
                    bytes = _groupcompress_pyx.apply_delta(plain, delta)
 
536
                    if bytes is None:
 
537
                        import pdb; pdb.set_trace()
 
538
                    chunks = [bytes]
 
539
                    del bytes
 
540
                if _NO_LABELS:
 
541
                    sha1 = sha_strings(chunks)
 
542
                else:
 
543
                    if not _FAST and sha_strings(chunks) != sha1:
 
544
                        raise AssertionError('sha1 sum did not match')
646
545
            yield ChunkedContentFactory(key, parents, sha1, chunks)
647
546
 
648
547
    def get_sha1s(self, keys):
708
607
            if record.storage_kind == 'absent':
709
608
                raise errors.RevisionNotPresent(record.key, self)
710
609
            try:
711
 
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
610
                bytes = record.get_bytes_as('fulltext')
712
611
            except errors.UnavailableRepresentation:
713
 
                try:
714
 
                    bytes = record.get_bytes_as('fulltext')
715
 
                except errors.UnavailableRepresentation:
716
 
                    adapter_key = record.storage_kind, 'fulltext'
717
 
                    adapter = get_adapter(adapter_key)
718
 
                    bytes = adapter.get_bytes(record)
719
 
                lines = osutils.split_lines(bytes)
 
612
                adapter_key = record.storage_kind, 'fulltext'
 
613
                adapter = get_adapter(adapter_key)
 
614
                bytes = adapter.get_bytes(record)
720
615
            soft = False
721
616
            if len(record.key) > 1:
722
617
                prefix = record.key[0]
723
618
                if (last_prefix is not None and prefix != last_prefix):
724
619
                    soft = True
725
 
                    if basis_end > 1024 * 1024 * 4:
 
620
                    if basis_end > 1024 * 1024 * 2:
726
621
                        flush()
727
622
                        self._compressor = GroupCompressor(self._delta)
728
623
                        self._unadded_refs = {}
731
626
                        groups += 1
732
627
                last_prefix = prefix
733
628
            found_sha1, end_point = self._compressor.compress(record.key,
734
 
                lines, record.sha1, soft=soft)
 
629
                bytes, record.sha1, soft=soft)
735
630
            if record.key[-1] is None:
736
631
                key = record.key[:-1] + ('sha1:' + found_sha1,)
737
632
            else:
741
636
            keys_to_add.append((key, '%d %d' % (basis_end, end_point),
742
637
                (record.parents,)))
743
638
            basis_end = end_point
744
 
            if basis_end > 1024 * 1024 * 8:
 
639
            if basis_end > 1024 * 1024 * 4:
745
640
                flush()
746
641
                self._compressor = GroupCompressor(self._delta)
747
642
                self._unadded_refs = {}
785
680
            'unordered', True)):
786
681
            # XXX: todo - optimise to use less than full texts.
787
682
            key = record.key
788
 
            pb.update('Walking content.', key_idx + 1, total)
 
683
            pb.update('Walking content.', key_idx, total)
789
684
            if record.storage_kind == 'absent':
790
685
                raise errors.RevisionNotPresent(key, self)
791
686
            lines = split_lines(record.get_bytes_as('fulltext'))
1020
915
 
1021
916
 
1022
917
try:
1023
 
    from bzrlib.plugins.groupcompress import _groupcompress_c
 
918
    from bzrlib.plugins.groupcompress import _groupcompress_pyx
1024
919
except ImportError:
1025
920
    pass
1026
 
else:
1027
 
    GroupCompressor._equivalence_table_class = _groupcompress_c.EquivalenceTable
1028
 
    _get_longest_match = _groupcompress_c._get_longest_match