~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to groupcompress.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-04 21:06:22 UTC
  • mto: (0.17.34 trunk)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: john@arbash-meinel.com-20090304210622-ur7wz2dz0w4lhzn3
(tests broken) implement the basic ability to have a separate header
This puts the labels/sha1/etc together, and then has the actual content deltas
combined later on.

Show diffs side-by-side

added added

removed removed

Lines of Context:
53
53
    )
54
54
from bzrlib.plugins.groupcompress import errors as gc_errors
55
55
 
56
 
_NO_LABELS = False
57
56
_FAST = False
58
57
 
59
 
def parse(bytes):
60
 
    if _NO_LABELS:
61
 
        action_byte = bytes[0]
62
 
        action = {'f':'fulltext', 'd':'delta'}[action_byte]
63
 
        return action, None, None, bytes[1:]
64
 
    (action, label_line, sha1_line, len_line,
65
 
     delta_bytes) = bytes.split('\n', 4)
66
 
    if (action not in ('fulltext', 'delta')
67
 
        or not label_line.startswith('label:')
68
 
        or not sha1_line.startswith('sha1:')
69
 
        or not len_line.startswith('len:')
70
 
        ):
71
 
        raise AssertionError("bad text record %r" % (bytes,))
72
 
    label = tuple(label_line[6:].split('\x00'))
73
 
    sha1 = sha1_line[5:]
74
 
    length = int(len_line[4:])
75
 
    if not len(delta_bytes) == length:
76
 
        raise AssertionError("bad length record %r" % (bytes,))
77
 
    return action, label, sha1, delta_bytes
78
 
 
79
 
 
80
58
def encode_base128_int(val):
81
59
    """Convert an integer into a 7-bit lsb encoding."""
82
60
    bytes = []
145
123
        self.start = start # Byte offset to start of data
146
124
        self.length = length # Length of content
147
125
 
 
126
    def __repr__(self):
 
127
        return '%s(%s, %s, %s, %s, %s)' % (
 
128
            self.__class__.__name__,
 
129
            self.key, self.type, self.sha1, self.start, self.length
 
130
            )
 
131
 
148
132
 
149
133
class GroupCompressBlock(object):
150
134
    """An object which maintains the internal structure of the compressed data.
158
142
    def __init__(self):
159
143
        # map by key? or just order in file?
160
144
        self._entries = {}
 
145
        self._content = None
 
146
        self._size = 0
161
147
 
162
148
    def _parse_header(self):
163
149
        """Parse the meta-info from the stream."""
164
150
 
 
151
    def __len__(self):
 
152
        return self._size
 
153
 
165
154
    @classmethod
166
155
    def from_bytes(cls, bytes):
167
156
        out = cls()
184
173
        assert len(header_bytes) == header_length
185
174
        del z_header_bytes
186
175
        lines = header_bytes.split('\n')
 
176
        header_len = len(header_bytes)
187
177
        del header_bytes
188
178
        info_dict = {}
189
179
        for line in lines:
198
188
                value = tuple(map(intern, value.split('\x00')))
199
189
            elif key in ('start', 'length'):
200
190
                value = int(value)
 
191
            elif key == 'type':
 
192
                value = intern(value)
201
193
            info_dict[key] = value
 
194
        zcontent = bytes[pos2:]
 
195
        if zcontent:
 
196
            out._content = zlib.decompress(zcontent)
 
197
            out._size = header_len + len(out._content)
202
198
        return out
203
199
 
204
200
    def extract(self, key, sha1=None):
208
204
        :param sha1: TODO (should we validate only when sha1 is supplied?)
209
205
        :return: The bytes for the content
210
206
        """
 
207
        entry = self._entries[key]
 
208
        if entry.type == 'fulltext':
 
209
            bytes = self._content[entry.start:entry.start + entry.length]
 
210
        elif entry.type == 'delta':
 
211
            delta = self._content[entry.start:entry.start + entry.length]
 
212
            bytes = _groupcompress_pyx.apply_delta(self._content, delta)
 
213
        # XXX: sha1?
 
214
        return entry, bytes
211
215
 
212
216
    def add_entry(self, key, type, sha1, start, length):
213
217
        """Add new meta info about an entry.
281
285
        self.input_bytes = 0
282
286
        self.labels_deltas = {}
283
287
        self._delta_index = _groupcompress_pyx.DeltaIndex()
 
288
        self._block = GroupCompressBlock()
284
289
 
285
290
    def compress(self, key, bytes, expected_sha, soft=False):
286
291
        """Compress lines with label key.
304
309
            sha1 = expected_sha
305
310
        if key[-1] is None:
306
311
            key = key[:-1] + ('sha1:' + sha1,)
307
 
        label = '\x00'.join(key)
308
312
        input_len = len(bytes)
309
313
        # By having action/label/sha1/len, we can parse the group if the index
310
314
        # was ever destroyed, we have the key in 'label', we know the final
314
318
        # 'len: %d\n' costs approximately 1% increase in total data
315
319
        # Having the labels at all costs us 9-10% increase, 38% increase for
316
320
        # inventory pages, and 5.8% increase for text pages
317
 
        if _NO_LABELS:
318
 
            new_chunks = []
319
 
        else:
320
 
            new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
 
321
        # new_chunks = ['label:%s\nsha1:%s\n' % (label, sha1)]
321
322
        if self._delta_index._source_offset != self.endpoint:
322
323
            raise AssertionError('_source_offset != endpoint'
323
324
                ' somehow the DeltaIndex got out of sync with'
327
328
        if (delta is None):
328
329
            # We can't delta (perhaps source_text is empty)
329
330
            # so mark this as an insert
330
 
            if _NO_LABELS:
331
 
                new_chunks = ['f']
332
 
            else:
333
 
                new_chunks.insert(0, 'fulltext\n')
334
 
                new_chunks.append('len:%s\n' % (input_len,))
335
 
            unadded_bytes = sum(map(len, new_chunks))
336
 
            self._delta_index.add_source(bytes, unadded_bytes)
337
 
            new_chunks.append(bytes)
 
331
            self._block.add_entry(key, type='fulltext', sha1=sha1,
 
332
                                  start=self.endpoint, length=len(bytes))
 
333
            self._delta_index.add_source(bytes, 0)
 
334
            new_chunks = [bytes]
338
335
        else:
339
 
            if _NO_LABELS:
340
 
                new_chunks = ['d']
341
 
            else:
342
 
                new_chunks.insert(0, 'delta\n')
343
 
                new_chunks.append('len:%s\n' % (len(delta),))
 
336
            self._block.add_entry(key, type='delta', sha1=sha1,
 
337
                                  start=self.endpoint, length=len(delta))
 
338
            new_chunks = [delta]
344
339
            if _FAST:
345
 
                new_chunks.append(delta)
346
 
                unadded_bytes = sum(map(len, new_chunks))
347
 
                self._delta_index._source_offset += unadded_bytes
 
340
                self._delta_index._source_offset += len(delta)
348
341
            else:
349
 
                unadded_bytes = sum(map(len, new_chunks))
350
 
                self._delta_index.add_delta_source(delta, unadded_bytes)
351
 
                new_chunks.append(delta)
 
342
                self._delta_index.add_delta_source(delta, 0)
352
343
        delta_start = (self.endpoint, len(self.lines))
353
344
        self.output_chunks(new_chunks)
354
345
        self.input_bytes += input_len
368
359
        """
369
360
        delta_details = self.labels_deltas[key]
370
361
        delta_chunks = self.lines[delta_details[0][1]:delta_details[1][1]]
371
 
        action, label, sha1, delta = parse(''.join(delta_chunks))
372
 
        if not _NO_LABELS and label != key:
373
 
            raise AssertionError("wrong key: %r, wanted %r" % (label, key))
374
 
        if action == 'fulltext':
375
 
            bytes = delta
 
362
        stored_bytes = ''.join(delta_chunks)
 
363
        # TODO: Fix this, we shouldn't really be peeking here
 
364
        entry = self._block._entries[key]
 
365
        if entry.type == 'fulltext':
 
366
            bytes = stored_bytes
376
367
        else:
377
 
            source = ''.join(self.lines[delta_details[0][0]])
 
368
            assert entry.type == 'delta'
 
369
            # XXX: This is inefficient at best
 
370
            source = ''.join(self.lines)
378
371
            bytes = _groupcompress_pyx.apply_delta(source, delta)
379
 
        if _NO_LABELS:
380
 
            sha1 = sha_string(bytes)
381
 
        else:
382
 
            assert sha1 == sha_string(bytes)
383
 
        return [bytes], sha1
 
372
            assert entry.sha1 == sha_string(bytes)
 
373
        return bytes, entry.sha1
384
374
 
385
375
    def output_chunks(self, new_chunks):
386
376
        """Output some chunks.
573
563
                    result[key] = self._unadded_refs[key]
574
564
        return result
575
565
 
576
 
    def _get_group_and_delta_bytes(self, index_memo):
 
566
    def _get_block(self, index_memo):
577
567
        read_memo = index_memo[0:3]
578
568
        # get the group:
579
569
        try:
580
 
            plain = self._group_cache[read_memo]
 
570
            block = self._group_cache[read_memo]
581
571
        except KeyError:
582
572
            # read the group
583
573
            zdata = self._access.get_raw_records([read_memo]).next()
585
575
            # permits caching. We might want to store the partially
586
576
            # decompresed group and decompress object, so that recent
587
577
            # texts are not penalised by big groups.
588
 
            plain = zlib.decompress(zdata) #, index_memo[4])
589
 
            self._group_cache[read_memo] = plain
 
578
            block = GroupCompressBlock.from_bytes(zdata)
 
579
            self._group_cache[read_memo] = block
590
580
        # cheapo debugging:
591
581
        # print len(zdata), len(plain)
592
582
        # parse - requires split_lines, better to have byte offsets
593
583
        # here (but not by much - we only split the region for the
594
584
        # recipe, and we often want to end up with lines anyway.
595
 
        return plain, plain[index_memo[3]:index_memo[4]]
 
585
        return block
596
586
 
597
587
    def get_missing_compression_parent_keys(self):
598
588
        """Return the keys of missing compression parents.
671
661
                parents = self._unadded_refs[key]
672
662
            else:
673
663
                index_memo, _, parents, (method, _) = locations[key]
674
 
                plain, delta_bytes = self._get_group_and_delta_bytes(index_memo)
675
 
                action, label, sha1, delta = parse(delta_bytes)
676
 
                if not _NO_LABELS and label != key:
677
 
                    raise AssertionError("wrong key: %r, wanted %r" % (label, key))
678
 
                if action == 'fulltext':
679
 
                    chunks = [delta]
680
 
                else:
681
 
                    # TODO: relax apply_delta so that it can allow source to be
682
 
                    #       longer than expected
683
 
                    bytes = _groupcompress_pyx.apply_delta(plain, delta)
684
 
                    if bytes is None:
685
 
                        import pdb; pdb.set_trace()
686
 
                    chunks = [bytes]
687
 
                    del bytes
688
 
                if _NO_LABELS:
689
 
                    sha1 = sha_strings(chunks)
690
 
                else:
691
 
                    if not _FAST and sha_strings(chunks) != sha1:
692
 
                        raise AssertionError('sha1 sum did not match')
693
 
            yield ChunkedContentFactory(key, parents, sha1, chunks)
 
664
                block = self._get_block(index_memo)
 
665
                entry, bytes = block.extract(key)
 
666
                sha1 = entry.sha1
 
667
                if not _FAST and sha_string(bytes) != sha1:
 
668
                    raise AssertionError('sha1 sum did not match')
 
669
            yield FulltextContentFactory(key, parents, sha1, bytes)
694
670
 
695
671
    def get_sha1s(self, keys):
696
672
        """See VersionedFiles.get_sha1s()."""
742
718
        basis_end = 0
743
719
        groups = 1
744
720
        def flush():
 
721
            # TODO: we may want to have the header compressed in the same chain
 
722
            #       as the data, or we may not, evaulate it
 
723
            #       having them compressed together is probably a win for
 
724
            #       revisions and the 'inv' portion of chk inventories. As the
 
725
            #       label in the header is duplicated in the text.
 
726
            #       For chk pages and real bytes, I would guess this is not
 
727
            #       true.
 
728
            header = self._compressor._block.to_bytes()
745
729
            compressed = zlib.compress(''.join(self._compressor.lines))
 
730
            out = header + compressed
746
731
            index, start, length = self._access.add_raw_records(
747
 
                [(None, len(compressed))], compressed)[0]
 
732
                [(None, len(out))], out)[0]
748
733
            nodes = []
749
734
            for key, reads, refs in keys_to_add:
750
735
                nodes.append((key, "%d %d %s" % (start, length, reads), refs))
1024
1009
        return node[0], start, stop, basis_end, delta_end
1025
1010
 
1026
1011
 
1027
 
def _get_longest_match(equivalence_table, pos, max_pos, locations):
1028
 
    """Get the longest possible match for the current position."""
1029
 
    range_start = pos
1030
 
    range_len = 0
1031
 
    copy_ends = None
1032
 
    while pos < max_pos:
1033
 
        if locations is None:
1034
 
            locations = equivalence_table.get_idx_matches(pos)
1035
 
        if locations is None:
1036
 
            # No more matches, just return whatever we have, but we know that
1037
 
            # this last position is not going to match anything
1038
 
            pos += 1
1039
 
            break
1040
 
        else:
1041
 
            if copy_ends is None:
1042
 
                # We are starting a new range
1043
 
                copy_ends = [loc + 1 for loc in locations]
1044
 
                range_len = 1
1045
 
                locations = None # Consumed
1046
 
            else:
1047
 
                # We are currently in the middle of a match
1048
 
                next_locations = set(copy_ends).intersection(locations)
1049
 
                if len(next_locations):
1050
 
                    # range continues
1051
 
                    copy_ends = [loc + 1 for loc in next_locations]
1052
 
                    range_len += 1
1053
 
                    locations = None # Consumed
1054
 
                else:
1055
 
                    # But we are done with this match, we should be
1056
 
                    # starting a new one, though. We will pass back 'locations'
1057
 
                    # so that we don't have to do another lookup.
1058
 
                    break
1059
 
        pos += 1
1060
 
    if copy_ends is None:
1061
 
        return None, pos, locations
1062
 
    return ((min(copy_ends) - range_len, range_start, range_len)), pos, locations
1063
 
 
1064
 
 
1065
1012
try:
1066
1013
    from bzrlib.plugins.groupcompress import _groupcompress_pyx
1067
1014
except ImportError: