~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to groupcompress.py

Merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
    diff,
28
28
    errors,
29
29
    graph as _mod_graph,
 
30
    osutils,
30
31
    pack,
31
32
    patiencediff,
32
33
    )
46
47
from bzrlib.versionedfile import (
47
48
    adapter_registry,
48
49
    AbsentContentFactory,
 
50
    ChunkedContentFactory,
49
51
    FulltextContentFactory,
50
52
    VersionedFiles,
51
53
    )
55
57
    result = []
56
58
    lines = iter(line_list)
57
59
    next = lines.next
58
 
    label_line = lines.next()
59
 
    sha1_line = lines.next()
 
60
    label_line = next()
 
61
    sha1_line = next()
60
62
    if (not label_line.startswith('label: ') or
61
63
        not sha1_line.startswith('sha1: ')):
62
64
        raise AssertionError("bad text record %r" % lines)
71
73
        else:
72
74
            contents = [next() for i in xrange(numbers[0])]
73
75
            result.append((op, None, numbers[0], contents))
 
76
    ## return result
74
77
    return label, sha1, result
75
78
 
 
79
 
76
80
def apply_delta(basis, delta):
77
81
    """Apply delta to this object to become new_version_id."""
78
82
    lines = []
95
99
        lines[-1] = lines[-1][:-1]
96
100
 
97
101
 
 
102
 
 
103
def sort_gc_optimal(parent_map):
 
104
    """Sort and group the keys in parent_map into gc-optimal order.
 
105
 
 
106
    gc-optimal is defined (currently) as reverse-topological order, grouped by
 
107
    the key prefix.
 
108
 
 
109
    :return: A sorted-list of keys
 
110
    """
 
111
    # gc-optimal ordering is approximately reverse topological,
 
112
    # properly grouped by file-id.
 
113
    per_prefix_map = {}
 
114
    for item in parent_map.iteritems():
 
115
        key = item[0]
 
116
        if isinstance(key, str) or len(key) == 1:
 
117
            prefix = ''
 
118
        else:
 
119
            prefix = key[0]
 
120
        try:
 
121
            per_prefix_map[prefix].append(item)
 
122
        except KeyError:
 
123
            per_prefix_map[prefix] = [item]
 
124
 
 
125
    present_keys = []
 
126
    for prefix in sorted(per_prefix_map):
 
127
        present_keys.extend(reversed(topo_sort(per_prefix_map[prefix])))
 
128
    return present_keys
 
129
 
 
130
 
98
131
class GroupCompressor(object):
99
132
    """Produce a serialised group of compressed texts.
100
133
    
116
149
    def __init__(self, delta=True):
117
150
        """Create a GroupCompressor.
118
151
 
119
 
        :paeam delta: If False, do not compress records.
 
152
        :param delta: If False, do not compress records.
120
153
        """
121
154
        self._delta = delta
122
155
        self.line_offsets = []
125
158
        self.line_locations = self._equivalence_table_class([])
126
159
        self.lines = self.line_locations.lines
127
160
        self.labels_deltas = {}
 
161
        self._present_prefixes = set()
128
162
 
129
 
    def get_matching_blocks(self, lines):
130
 
        """Return an the ranges in lines which match self.lines.
 
163
    def get_matching_blocks(self, lines, soft=False):
 
164
        """Return the ranges in lines which match self.lines.
131
165
 
132
166
        :param lines: lines to compress
133
167
        :return: A list of (old_start, new_start, length) tuples which reflect
143
177
        # insert new lines. To find reusable lines we traverse 
144
178
        locations = None
145
179
        max_pos = len(lines)
146
 
        max_time = 0.0
147
 
        max_info = None
148
180
        result_append = result.append
 
181
        min_match_bytes = 10
 
182
        if soft:
 
183
            min_match_bytes = 200
149
184
        while pos < max_pos:
150
185
            block, pos, locations = _get_longest_match(line_locations, pos,
151
186
                                                       max_pos, locations)
152
187
            if block is not None:
 
188
                # Check to see if we are matching fewer than 5 characters,
 
189
                # which is turned into a simple 'insert', rather than a copy
 
190
                # If we have more than 5 lines, we definitely have more than 5
 
191
                # chars
 
192
                if block[-1] < min_match_bytes:
 
193
                    # This block may be a 'short' block, check
 
194
                    old_start, new_start, range_len = block
 
195
                    matched_bytes = sum(map(len,
 
196
                        lines[new_start:new_start + range_len]))
 
197
                    if matched_bytes < min_match_bytes:
 
198
                        block = None
 
199
            if block is not None:
153
200
                result_append(block)
154
201
        result_append((len(self.lines), len(lines), 0))
155
202
        return result
156
203
 
157
 
    def compress(self, key, lines, expected_sha):
 
204
    def compress(self, key, lines, expected_sha, soft=False):
158
205
        """Compress lines with label key.
159
206
 
160
207
        :param key: A key tuple. It is stored in the output
163
210
            e.g. sha1:xxxxxxx.
164
211
        :param lines: The lines to be compressed. Must be split
165
212
            on \n, with the \n preserved.'
166
 
        :param expected_sha: If non-None, the sha the lines are blieved to
 
213
        :param expected_sha: If non-None, the sha the lines are believed to
167
214
            have. During compression the sha is calculated; a mismatch will
168
215
            cause an error.
 
216
        :param soft: Do a 'soft' compression. This means that we require larger
 
217
            ranges to match to be considered for a copy command.
169
218
        :return: The sha1 of lines, and the number of bytes accumulated in
170
219
            the group output so far.
171
220
        """
173
222
        if key[-1] is None:
174
223
            key = key[:-1] + ('sha1:' + sha1,)
175
224
        label = '\x00'.join(key)
 
225
        ## new_lines = []
 
226
        new_lines = ['label: %s\n' % label,
 
227
                     'sha1: %s\n' % sha1,
 
228
                    ]
 
229
        ## index_lines = []
 
230
        index_lines = [False, False]
176
231
        # setup good encoding for trailing \n support.
177
232
        if not lines or lines[-1].endswith('\n'):
178
233
            lines.append('\n')
179
234
        else:
180
235
            lines[-1] = lines[-1] + '\n'
181
 
        new_lines = []
182
 
        new_lines.append('label: %s\n' % label)
183
 
        new_lines.append('sha1: %s\n' % sha1)
184
 
        index_lines = [False, False]
185
236
        pos = 0
186
237
        range_len = 0
187
238
        range_start = 0
188
239
        flush_range = self.flush_range
189
240
        copy_ends = None
190
 
        blocks = self.get_matching_blocks(lines)
 
241
        blocks = self.get_matching_blocks(lines, soft=soft)
191
242
        current_pos = 0
192
 
        # We either copy a range (while there are reusable lines) or we 
193
 
        # insert new lines. To find reusable lines we traverse 
 
243
        #copies_without_insertion = []
 
244
        # We either copy a range (while there are reusable lines) or we
 
245
        # insert new lines. To find reusable lines we traverse
194
246
        for old_start, new_start, range_len in blocks:
195
247
            if new_start != current_pos:
 
248
                # if copies_without_insertion:
 
249
                #     self.flush_multi(copies_without_insertion,
 
250
                #                      lines, new_lines, index_lines)
 
251
                #     copies_without_insertion = []
196
252
                # non-matching region
197
253
                flush_range(current_pos, None, new_start - current_pos,
198
254
                    lines, new_lines, index_lines)
199
255
            current_pos = new_start + range_len
200
256
            if not range_len:
201
257
                continue
202
 
            flush_range(new_start, old_start, range_len, lines,
203
 
                new_lines, index_lines)
 
258
            # copies_without_insertion.append((new_start, old_start, range_len))
 
259
            flush_range(new_start, old_start, range_len,
 
260
                        lines, new_lines, index_lines)
 
261
        # if copies_without_insertion:
 
262
        #     self.flush_multi(copies_without_insertion,
 
263
        #                      lines, new_lines, index_lines)
 
264
        #     copies_without_insertion = []
204
265
        delta_start = (self.endpoint, len(self.lines))
205
266
        self.output_lines(new_lines, index_lines)
206
267
        trim_encoding_newline(lines)
218
279
        delta_details = self.labels_deltas[key]
219
280
        delta_lines = self.lines[delta_details[0][1]:delta_details[1][1]]
220
281
        label, sha1, delta = parse(delta_lines)
 
282
        ## delta = parse(delta_lines)
221
283
        if label != key:
222
284
            raise AssertionError("wrong key: %r, wanted %r" % (label, key))
223
285
        # Perhaps we want to keep the line offsets too in memory at least?
224
 
        lines = apply_delta(''.join(self.lines), delta)
225
 
        sha1 = sha_strings(lines)
226
 
        return lines, sha1
 
286
        chunks = apply_delta(''.join(self.lines), delta)
 
287
        sha1 = sha_strings(chunks)
 
288
        return chunks, sha1
 
289
 
 
290
    def flush_multi(self, instructions, lines, new_lines, index_lines):
 
291
        """Flush a bunch of different ranges out.
 
292
 
 
293
        This should only be called with data that are "pure" copies.
 
294
        """
 
295
        flush_range = self.flush_range
 
296
        if len(instructions) > 2:
 
297
            # This is the number of lines to be copied
 
298
            total_copy_range = sum(i[2] for i in instructions)
 
299
            if len(instructions) > 0.5 * total_copy_range:
 
300
                # We are copying N lines, but taking more than N/2
 
301
                # copy instructions to do so. We will go ahead and expand this
 
302
                # text so that other code is able to match against it
 
303
                flush_range(instructions[0][0], None, total_copy_range,
 
304
                            lines, new_lines, index_lines)
 
305
                return
 
306
        for ns, os, rl in instructions:
 
307
            flush_range(ns, os, rl, lines, new_lines, index_lines)
227
308
 
228
309
    def flush_range(self, range_start, copy_start, range_len, lines, new_lines, index_lines):
229
310
        insert_instruction = "i,%d\n" % range_len
284
365
        parents = graph or delta
285
366
        ref_length = 0
286
367
        if graph:
287
 
            ref_length += 1
 
368
            ref_length = 1
288
369
        graph_index = BTreeBuilder(reference_lists=ref_length,
289
370
            key_elements=keylength)
290
371
        stream = transport.open_write_stream('newpack')
366
447
            # an empty tuple instead.
367
448
            parents = ()
368
449
        # double handling for now. Make it work until then.
369
 
        bytes = ''.join(lines)
370
 
        record = FulltextContentFactory(key, parents, None, bytes)
 
450
        length = sum(map(len, lines))
 
451
        record = ChunkedContentFactory(key, parents, None, lines)
371
452
        sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
372
 
        return sha1, len(bytes), None
 
453
        return sha1, length, None
373
454
 
374
455
    def annotate(self, key):
375
456
        """See VersionedFiles.annotate."""
395
476
        reannotate = annotate.reannotate
396
477
        for record in self.get_record_stream(keys, 'topological', True):
397
478
            key = record.key
398
 
            fulltext = split_lines(record.get_bytes_as('fulltext'))
 
479
            chunks = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
399
480
            parent_lines = [parent_cache[parent] for parent in parent_map[key]]
400
481
            parent_cache[key] = list(
401
 
                reannotate(parent_lines, fulltext, key, None, head_cache))
 
482
                reannotate(parent_lines, chunks, key, None, head_cache))
402
483
        return parent_cache[key]
403
484
 
404
485
    def check(self, progress_bar=None):
446
527
                    result[key] = self._unadded_refs[key]
447
528
        return result
448
529
 
 
530
    def _get_delta_lines(self, key):
 
531
        """Helper function for manual debugging.
 
532
 
 
533
        This is a convenience function that shouldn't be used in production
 
534
        code.
 
535
        """
 
536
        build_details = self._index.get_build_details([key])[key]
 
537
        index_memo = build_details[0]
 
538
        group, delta_lines = self._get_group_and_delta_lines(index_memo)
 
539
        return delta_lines
 
540
 
 
541
    def _get_group_and_delta_lines(self, index_memo):
 
542
        read_memo = index_memo[0:3]
 
543
        # get the group:
 
544
        try:
 
545
            plain = self._group_cache[read_memo]
 
546
        except KeyError:
 
547
            # read the group
 
548
            zdata = self._access.get_raw_records([read_memo]).next()
 
549
            # decompress - whole thing - this is not a bug, as it
 
550
            # permits caching. We might want to store the partially
 
551
            # decompresed group and decompress object, so that recent
 
552
            # texts are not penalised by big groups.
 
553
            plain = zlib.decompress(zdata) #, index_memo[4])
 
554
            self._group_cache[read_memo] = plain
 
555
        # cheapo debugging:
 
556
        # print len(zdata), len(plain)
 
557
        # parse - requires split_lines, better to have byte offsets
 
558
        # here (but not by much - we only split the region for the
 
559
        # recipe, and we often want to end up with lines anyway.
 
560
        return plain, split_lines(plain[index_memo[3]:index_memo[4]])
 
561
 
 
562
    def get_missing_compression_parent_keys(self):
 
563
        """Return the keys of missing compression parents.
 
564
 
 
565
        Missing compression parents occur when a record stream was missing
 
566
        basis texts, or a index was scanned that had missing basis texts.
 
567
        """
 
568
        # GroupCompress cannot currently reference texts that are not in the
 
569
        # group, so this is valid for now
 
570
        return frozenset()
 
571
 
449
572
    def get_record_stream(self, keys, ordering, include_delta_closure):
450
573
        """Get a stream of records for keys.
451
574
 
459
582
            valid until the iterator is advanced.
460
583
        """
461
584
        # keys might be a generator
462
 
        keys = set(keys)
 
585
        orig_keys = list(keys)
 
586
        keys = set(orig_keys)
463
587
        if not keys:
464
588
            return
465
 
        if not self._index.has_graph:
 
589
        if (not self._index.has_graph
 
590
            and ordering in ('topological', 'gc-optimal')):
466
591
            # Cannot topological order when no graph has been stored.
467
592
            ordering = 'unordered'
468
593
        # Cheap: iterate
469
594
        locations = self._index.get_build_details(keys)
 
595
        local_keys = frozenset(keys).intersection(set(self._unadded_refs))
470
596
        if ordering == 'topological':
471
597
            # would be better to not globally sort initially but instead
472
598
            # start with one key, recurse to its oldest parent, then grab
473
599
            # everything in the same group, etc.
474
600
            parent_map = dict((key, details[2]) for key, details in
475
601
                locations.iteritems())
476
 
            local = frozenset(keys).intersection(set(self._unadded_refs))
477
 
            for key in local:
 
602
            for key in local_keys:
478
603
                parent_map[key] = self._unadded_refs[key]
479
 
                locations[key] = None
480
604
            present_keys = topo_sort(parent_map)
481
605
            # Now group by source:
 
606
        elif ordering == 'gc-optimal':
 
607
            parent_map = dict((key, details[2]) for key, details in
 
608
                              locations.iteritems())
 
609
            for key in local_keys:
 
610
                parent_map[key] = self._unadded_refs[key]
 
611
            # XXX: This only optimizes for the target ordering. We may need to
 
612
            #      balance that with the time it takes to extract ordering, by
 
613
            #      somehow grouping based on locations[key][0:3]
 
614
            present_keys = sort_gc_optimal(parent_map)
 
615
        elif ordering == 'as-requested':
 
616
            present_keys = [key for key in orig_keys if key in locations
 
617
                            or key in local_keys]
482
618
        else:
483
 
            present_keys = locations.keys()
484
 
            local = frozenset(keys).intersection(set(self._unadded_refs))
485
 
            for key in local:
486
 
                present_keys.append(key)
487
 
                locations[key] = None
 
619
            # We want to yield the keys in a semi-optimal (read-wise) ordering.
 
620
            # Otherwise we thrash the _group_cache and destroy performance
 
621
            def get_group(key):
 
622
                # This is the group the bytes are stored in, followed by the
 
623
                # location in the group
 
624
                return locations[key][0]
 
625
            present_keys = sorted(locations.iterkeys(), key=get_group)
 
626
            # We don't have an ordering for keys in the in-memory object, but
 
627
            # lets process the in-memory ones first.
 
628
            present_keys = list(local_keys) + present_keys
 
629
        locations.update((key, None) for key in local_keys)
488
630
        absent_keys = keys.difference(set(locations))
489
631
        for key in absent_keys:
490
632
            yield AbsentContentFactory(key)
491
633
        for key in present_keys:
492
634
            if key in self._unadded_refs:
493
 
                lines, sha1 = self._compressor.extract(key)
 
635
                chunks, sha1 = self._compressor.extract(key)
494
636
                parents = self._unadded_refs[key]
495
637
            else:
496
638
                index_memo, _, parents, (method, _) = locations[key]
497
 
                read_memo = index_memo[0:3]
498
 
                # get the group:
499
 
                try:
500
 
                    plain = self._group_cache[read_memo]
501
 
                except KeyError:
502
 
                    # read the group
503
 
                    zdata = self._access.get_raw_records([read_memo]).next()
504
 
                    # decompress - whole thing - this is not a bug, as it
505
 
                    # permits caching. We might want to store the partially
506
 
                    # decompresed group and decompress object, so that recent
507
 
                    # texts are not penalised by big groups.
508
 
                    decomp = zlib.decompressobj()
509
 
                    plain = decomp.decompress(zdata) #, index_memo[4])
510
 
                    self._group_cache[read_memo] = plain
511
 
                # cheapo debugging:
512
 
                # print len(zdata), len(plain)
513
 
                # parse - requires split_lines, better to have byte offsets
514
 
                # here (but not by much - we only split the region for the
515
 
                # recipe, and we often want to end up with lines anyway.
516
 
                delta_lines = split_lines(plain[index_memo[3]:index_memo[4]])
 
639
                plain, delta_lines = self._get_group_and_delta_lines(index_memo)
517
640
                label, sha1, delta = parse(delta_lines)
518
641
                if label != key:
519
642
                    raise AssertionError("wrong key: %r, wanted %r" % (label, key))
520
 
                lines = apply_delta(plain, delta)
521
 
            bytes = ''.join(lines)
522
 
            yield FulltextContentFactory(key, parents, sha1, bytes)
523
 
            
 
643
                chunks = apply_delta(plain, delta)
 
644
                if sha_strings(chunks) != sha1:
 
645
                    raise AssertionError('sha1 sum did not match')
 
646
            yield ChunkedContentFactory(key, parents, sha1, chunks)
 
647
 
524
648
    def get_sha1s(self, keys):
525
649
        """See VersionedFiles.get_sha1s()."""
526
650
        result = {}
554
678
        :seealso insert_record_stream:
555
679
        :seealso add_lines:
556
680
        """
 
681
        adapters = {}
557
682
        def get_adapter(adapter_key):
558
683
            try:
559
684
                return adapters[adapter_key]
562
687
                adapter = adapter_factory(self)
563
688
                adapters[adapter_key] = adapter
564
689
                return adapter
565
 
        adapters = {}
566
690
        # This will go up to fulltexts for gc to gc fetching, which isn't
567
691
        # ideal.
568
692
        self._compressor = GroupCompressor(self._delta)
578
702
            for key, reads, refs in keys_to_add:
579
703
                nodes.append((key, "%d %d %s" % (start, length, reads), refs))
580
704
            self._index.add_records(nodes, random_id=random_id)
 
705
        last_prefix = None
581
706
        for record in stream:
582
707
            # Raise an error when a record is missing.
583
708
            if record.storage_kind == 'absent':
584
 
                raise errors.RevisionNotPresent([record.key], self)
585
 
            elif record.storage_kind == 'fulltext':
586
 
                bytes = record.get_bytes_as('fulltext')
587
 
            else:
588
 
                adapter_key = record.storage_kind, 'fulltext'
589
 
                adapter = get_adapter(adapter_key)
590
 
                bytes = adapter.get_bytes(record)
 
709
                raise errors.RevisionNotPresent(record.key, self)
 
710
            try:
 
711
                lines = osutils.chunks_to_lines(record.get_bytes_as('chunked'))
 
712
            except errors.UnavailableRepresentation:
 
713
                try:
 
714
                    bytes = record.get_bytes_as('fulltext')
 
715
                except errors.UnavailableRepresentation:
 
716
                    adapter_key = record.storage_kind, 'fulltext'
 
717
                    adapter = get_adapter(adapter_key)
 
718
                    bytes = adapter.get_bytes(record)
 
719
                lines = osutils.split_lines(bytes)
 
720
            soft = False
 
721
            if len(record.key) > 1:
 
722
                prefix = record.key[0]
 
723
                if (last_prefix is not None and prefix != last_prefix):
 
724
                    soft = True
 
725
                    if basis_end > 1024 * 1024 * 4:
 
726
                        flush()
 
727
                        self._compressor = GroupCompressor(self._delta)
 
728
                        self._unadded_refs = {}
 
729
                        keys_to_add = []
 
730
                        basis_end = 0
 
731
                        groups += 1
 
732
                last_prefix = prefix
591
733
            found_sha1, end_point = self._compressor.compress(record.key,
592
 
                split_lines(bytes), record.sha1)
 
734
                lines, record.sha1, soft=soft)
593
735
            if record.key[-1] is None:
594
736
                key = record.key[:-1] + ('sha1:' + found_sha1,)
595
737
            else:
599
741
            keys_to_add.append((key, '%d %d' % (basis_end, end_point),
600
742
                (record.parents,)))
601
743
            basis_end = end_point
602
 
            if basis_end > 1024 * 1024 * 20:
 
744
            if basis_end > 1024 * 1024 * 8:
603
745
                flush()
604
746
                self._compressor = GroupCompressor(self._delta)
605
747
                self._unadded_refs = {}
643
785
            'unordered', True)):
644
786
            # XXX: todo - optimise to use less than full texts.
645
787
            key = record.key
646
 
            pb.update('Walking content.', key_idx, total)
 
788
            pb.update('Walking content.', key_idx + 1, total)
647
789
            if record.storage_kind == 'absent':
648
 
                raise errors.RevisionNotPresent(record.key, self)
 
790
                raise errors.RevisionNotPresent(key, self)
649
791
            lines = split_lines(record.get_bytes_as('fulltext'))
650
792
            for line in lines:
651
793
                yield line, key
670
812
        """Construct a _GCGraphIndex on a graph_index.
671
813
 
672
814
        :param graph_index: An implementation of bzrlib.index.GraphIndex.
673
 
        :param is_locked: A callback to check whether the object should answer
674
 
            queries.
 
815
        :param is_locked: A callback, returns True if the index is locked and
 
816
            thus usable.
675
817
        :param parents: If True, record knits parents, if not do not record 
676
818
            parents.
677
819
        :param add_callback: If not None, allow additions to the index and call
678
820
            this callback with a list of added GraphIndex nodes:
679
821
            [(node, value, node_refs), ...]
680
 
        :param is_locked: A callback, returns True if the index is locked and
681
 
            thus usable.
682
822
        """
683
823
        self._add_callback = add_callback
684
824
        self._graph_index = graph_index
738
878
        self._add_callback(records)
739
879
        
740
880
    def _check_read(self):
741
 
        """raise if reads are not permitted."""
 
881
        """Raise an exception if reads are not permitted."""
742
882
        if not self._is_locked():
743
883
            raise errors.ObjectNotLocked(self)
744
884
 
745
885
    def _check_write_ok(self):
746
 
        """Assert if writes are not permitted."""
 
886
        """Raise an exception if writes are not permitted."""
747
887
        if not self._is_locked():
748
888
            raise errors.ObjectNotLocked(self)
749
889
 
750
890
    def _get_entries(self, keys, check_present=False):
751
891
        """Get the entries for keys.
752
 
        
 
892
 
 
893
        Note: Callers are responsible for checking that the index is locked
 
894
        before calling this method.
 
895
 
753
896
        :param keys: An iterable of index key tuples.
754
897
        """
755
898
        keys = set(keys)
807
950
        """
808
951
        self._check_read()
809
952
        result = {}
810
 
        entries = self._get_entries(keys, False)
 
953
        entries = self._get_entries(keys)
811
954
        for entry in entries:
812
955
            key = entry[1]
813
956
            if not self._parents:
814
957
                parents = None
815
958
            else:
816
959
                parents = entry[3][0]
817
 
            value = entry[2]
818
960
            method = 'group'
819
961
            result[key] = (self._node_to_position(entry),
820
962
                                  None, parents, (method, None))