~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to groupcompress.py

nograph tests completely passing.

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
 
18
18
"""Core compression logic for compressing streams of related files."""
19
19
 
 
20
from cStringIO import StringIO
 
21
import zlib
 
22
 
20
23
from bzrlib import (
21
24
    annotate,
 
25
    debug,
22
26
    diff,
 
27
    errors,
23
28
    graph as _mod_graph,
24
29
    pack,
25
30
    patiencediff,
35
40
    )
36
41
from bzrlib.plugins.index2.repofmt import InMemoryBTree
37
42
from bzrlib.versionedfile import (
 
43
    adapter_registry,
 
44
    AbsentContentFactory,
38
45
    FulltextContentFactory,
39
46
    VersionedFiles,
40
47
    )
41
48
 
42
49
 
43
 
def parse(lines):
 
50
def parse(line_list):
44
51
    result = []
45
 
    lines = iter(lines)
 
52
    lines = iter(line_list)
46
53
    next = lines.next
47
 
    print next(), next()
 
54
    label_line = lines.next()
 
55
    sha1_line = lines.next()
 
56
    if (not label_line.startswith('label: ') or
 
57
        not sha1_line.startswith('sha1: ')):
 
58
        raise AssertionError("bad text record %r" % lines)
 
59
    label = tuple(label_line[7:-1].split('\x00'))
 
60
    sha1 = sha1_line[6:-1]
48
61
    for header in lines:
49
62
        op = header[0]
50
63
        numbers = header[2:]
54
67
        else:
55
68
            contents = [next() for i in xrange(numbers[0])]
56
69
            result.append((op, None, numbers[0], contents))
57
 
    return result
 
70
    return label, sha1, result
58
71
 
59
72
def apply_delta(basis, delta):
60
73
    """Apply delta to this object to become new_version_id."""
295
308
        # double handling for now. Make it work until then.
296
309
        bytes = ''.join(lines)
297
310
        record = FulltextContentFactory(key, parents, None, bytes)
298
 
        sha1 = self._insert_record_stream([record]).next()
 
311
        sha1 = list(self._insert_record_stream([record], random_id=random_id))[0]
299
312
        return sha1, len(bytes), None
300
313
 
301
314
    def annotate(self, key):
302
315
        """See VersionedFiles.annotate."""
303
316
        graph = Graph(self)
 
317
        parent_map = self.get_parent_map([key])
 
318
        if not parent_map:
 
319
            raise errors.RevisionNotPresent(key, self)
 
320
        if parent_map[key] is not None:
 
321
            search = graph._make_breadth_first_searcher([key])
 
322
            keys = set()
 
323
            while True:
 
324
                try:
 
325
                    present, ghosts = search.next_with_ghosts()
 
326
                except StopIteration:
 
327
                    break
 
328
                keys.update(present)
 
329
            parent_map = self.get_parent_map(keys)
 
330
        else:
 
331
            keys = [key]
 
332
            parent_map = {key:()}
304
333
        head_cache = _mod_graph.FrozenHeadsCache(graph)
305
 
        search = graph._make_breadth_first_searcher([key])
306
 
        keys = set()
307
 
        while True:
308
 
            try:
309
 
                present, ghosts = search.next_with_ghosts()
310
 
            except StopIteration:
311
 
                break
312
 
            keys.update(present)
313
 
        parent_map = self.get_parent_map(keys)
314
334
        parent_cache = {}
315
335
        reannotate = annotate.reannotate
316
336
        for record in self.get_record_stream(keys, 'topological', True):
321
341
                reannotate(parent_lines, fulltext, key, None, head_cache))
322
342
        return parent_cache[key]
323
343
 
 
344
    def check(self, progress_bar=None):
 
345
        """See VersionedFiles.check()."""
 
346
        keys = self.keys()
 
347
        for record in self.get_record_stream(keys, 'unordered', True):
 
348
            record.get_bytes_as('fulltext')
 
349
 
324
350
    def _check_add(self, key, lines, random_id, check_content):
325
351
        """check that version_id and lines are safe to add."""
326
352
        version_id = key[-1]
335
361
            self._check_lines_not_unicode(lines)
336
362
            self._check_lines_are_lines(lines)
337
363
 
 
364
    def get_parent_map(self, keys):
 
365
        """Get a map of the parents of keys.
 
366
 
 
367
        :param keys: The keys to look up parents for.
 
368
        :return: A mapping from keys to parents. Absent keys are absent from
 
369
            the mapping.
 
370
        """
 
371
        result = {}
 
372
        sources = [self._index]
 
373
        source_results = []
 
374
        missing = set(keys)
 
375
        for source in sources:
 
376
            if not missing:
 
377
                break
 
378
            new_result = source.get_parent_map(missing)
 
379
            source_results.append(new_result)
 
380
            result.update(new_result)
 
381
            missing.difference_update(set(new_result))
 
382
        return result
 
383
 
 
384
    def get_record_stream(self, keys, ordering, include_delta_closure):
 
385
        """Get a stream of records for keys.
 
386
 
 
387
        :param keys: The keys to include.
 
388
        :param ordering: Either 'unordered' or 'topological'. A topologically
 
389
            sorted stream has compression parents strictly before their
 
390
            children.
 
391
        :param include_delta_closure: If True then the closure across any
 
392
            compression parents will be included (in the opaque data).
 
393
        :return: An iterator of ContentFactory objects, each of which is only
 
394
            valid until the iterator is advanced.
 
395
        """
 
396
        # keys might be a generator
 
397
        keys = set(keys)
 
398
        if not keys:
 
399
            return
 
400
        if not self._index.has_graph:
 
401
            # Cannot topological order when no graph has been stored.
 
402
            ordering = 'unordered'
 
403
        # Cheap: iterate
 
404
        locations = self._index.get_build_details(keys)
 
405
        if ordering == 'topological':
 
406
            # would be better to not globally sort initially but instead
 
407
            # start with one key, recurse to its oldest parent, then grab
 
408
            # everything in the same group, etc.
 
409
            parent_map = dict((key, details[2]) for key, details in
 
410
                locations.iteritems())
 
411
            present_keys = topo_sort(parent_map)
 
412
            # Now group by source:
 
413
        else:
 
414
            present_keys = locations.keys()
 
415
        absent_keys = keys.difference(set(locations))
 
416
        for key in absent_keys:
 
417
            yield AbsentContentFactory(key)
 
418
        for key in present_keys:
 
419
            index_memo, _, parents, (method, _) = locations[key]
 
420
            # read
 
421
            read_memo = index_memo[0:3]
 
422
            zdata = self._access.get_raw_records([read_memo]).next()
 
423
            # decompress
 
424
            plain = zlib.decompress(zdata)
 
425
            # parse
 
426
            delta_lines = split_lines(plain[index_memo[3]:index_memo[4]])
 
427
            label, sha1, delta = parse(delta_lines)
 
428
            if label != key:
 
429
                raise AssertionError("wrong key: %r, wanted %r" % (label, key))
 
430
            basis = plain[:index_memo[3]]
 
431
            basis = StringIO(basis).readlines()
 
432
            #basis = split_lines(plain[:last_end])
 
433
            lines = apply_delta(basis, delta)
 
434
            bytes = ''.join(lines)
 
435
            yield FulltextContentFactory(key, parents, sha1, bytes)
 
436
            
 
437
    def get_sha1s(self, keys):
 
438
        """See VersionedFiles.get_sha1s()."""
 
439
        result = {}
 
440
        for record in self.get_record_stream(keys, 'unordered', True):
 
441
            if record.sha1 != None:
 
442
                result[record.key] = record.sha1
 
443
            else:
 
444
                if record.storage_kind != 'absent':
 
445
                    result[record.key] == sha_string(record.get_bytes_as(
 
446
                        'fulltext'))
 
447
        return result
 
448
 
338
449
    def insert_record_stream(self, stream):
339
450
        """Insert a record stream into this container.
340
451
 
342
453
        :return: None
343
454
        :seealso VersionedFiles.get_record_stream:
344
455
        """
345
 
        self._insert_record_stream(stream)
 
456
        for _ in self._insert_record_stream(stream):
 
457
            pass
346
458
 
347
 
    def _insert_record_stream(self, stream):
 
459
    def _insert_record_stream(self, stream, random_id=False):
348
460
        """Internal core to insert a record stream into this container.
349
461
 
350
462
        This helper function has a different interface than insert_record_stream
355
467
        :seealso insert_record_stream:
356
468
        :seealso add_lines:
357
469
        """
 
470
        def get_adapter(adapter_key):
 
471
            try:
 
472
                return adapters[adapter_key]
 
473
            except KeyError:
 
474
                adapter_factory = adapter_registry.get(adapter_key)
 
475
                adapter = adapter_factory(self)
 
476
                adapters[adapter_key] = adapter
 
477
                return adapter
 
478
        adapters = {}
358
479
        compressor = GroupCompressor(self._delta)
359
480
        # This will go up to fulltexts for gc to gc fetching, which isn't
360
481
        # ideal.
 
482
        keys_to_add = []
 
483
        basis_end = 0
361
484
        for record in stream:
 
485
            # Raise an error when a record is missing.
 
486
            if record.storage_kind == 'absent':
 
487
                raise errors.RevisionNotPresent([record.key], self)
 
488
            elif record.storage_kind == 'fulltext':
 
489
                bytes = record.get_bytes_as('fulltext')
 
490
            else:
 
491
                adapter_key = record.storage_kind, 'fulltext'
 
492
                adapter = get_adapter(adapter_key)
 
493
                bytes = adapter.get_bytes(record,
 
494
                    record.get_bytes_as(record.storage_kind))
362
495
            found_sha1, end_point = compressor.compress(record.key,
363
 
                split_lines(record.get_bytes_as('fulltext')), record.sha1)
 
496
                split_lines(bytes), record.sha1)
364
497
            yield found_sha1
 
498
            keys_to_add.append((record.key, '%d %d' % (basis_end, end_point),
 
499
                (record.parents,)))
 
500
            basis_end = end_point
 
501
        compressed = zlib.compress(''.join(compressor.lines))
 
502
        index, start, length = self._access.add_raw_records(
 
503
            [(None, len(compressed))], compressed)[0]
 
504
        nodes = []
 
505
        for key, reads, refs in keys_to_add:
 
506
            nodes.append((key, "%d %d %s" % (start, length, reads), refs))
 
507
        self._index.add_records(nodes, random_id=random_id)
 
508
 
 
509
    def iter_lines_added_or_present_in_keys(self, keys, pb=None):
 
510
        """Iterate over the lines in the versioned files from keys.
 
511
 
 
512
        This may return lines from other keys. Each item the returned
 
513
        iterator yields is a tuple of a line and a text version that that line
 
514
        is present in (not introduced in).
 
515
 
 
516
        Ordering of results is in whatever order is most suitable for the
 
517
        underlying storage format.
 
518
 
 
519
        If a progress bar is supplied, it may be used to indicate progress.
 
520
        The caller is responsible for cleaning up progress bars (because this
 
521
        is an iterator).
 
522
 
 
523
        NOTES:
 
524
         * Lines are normalised by the underlying store: they will all have \n
 
525
           terminators.
 
526
         * Lines are returned in arbitrary order.
 
527
 
 
528
        :return: An iterator over (line, key).
 
529
        """
 
530
        if pb is None:
 
531
            pb = progress.DummyProgress()
 
532
        keys = set(keys)
 
533
        total = len(keys)
 
534
        # we don't care about inclusions, the caller cares.
 
535
        # but we need to setup a list of records to visit.
 
536
        # we need key, position, length
 
537
        for key_idx, record in enumerate(self.get_record_stream(keys,
 
538
            'unordered', True)):
 
539
            # XXX: todo - optimise to use less than full texts.
 
540
            key = record.key
 
541
            pb.update('Walking content.', key_idx, total)
 
542
            if record.storage_kind == 'absent':
 
543
                raise errors.RevisionNotPresent(record.key, self)
 
544
            lines = split_lines(record.get_bytes_as('fulltext'))
 
545
            for line in lines:
 
546
                yield line, key
 
547
        pb.update('Walking content.', total, total)
 
548
 
 
549
    def keys(self):
 
550
        """See VersionedFiles.keys."""
 
551
        if 'evil' in debug.debug_flags:
 
552
            trace.mutter_callsite(2, "keys scales with size of history")
 
553
        sources = [self._index]
 
554
        result = set()
 
555
        for source in sources:
 
556
            result.update(source.keys())
 
557
        return result
 
558
 
365
559
 
366
560
class _GCGraphIndex(object):
367
561
    """Mapper from GroupCompressVersionedFiles needs into GraphIndex storage."""
389
583
        if deltas and not parents:
390
584
            # XXX: TODO: Delta tree and parent graph should be conceptually
391
585
            # separate.
392
 
            raise KnitCorrupt(self, "Cannot do delta compression without "
 
586
            raise errors.KnitCorrupt(self, "Cannot do delta compression without "
393
587
                "parent tracking.")
394
588
        self.has_graph = parents
395
589
        self._is_locked = is_locked
396
590
 
 
591
    def add_records(self, records, random_id=False):
 
592
        """Add multiple records to the index.
 
593
        
 
594
        This function does not insert data into the Immutable GraphIndex
 
595
        backing the KnitGraphIndex, instead it prepares data for insertion by
 
596
        the caller and checks that it is safe to insert then calls
 
597
        self._add_callback with the prepared GraphIndex nodes.
 
598
 
 
599
        :param records: a list of tuples:
 
600
                         (key, options, access_memo, parents).
 
601
        :param random_id: If True the ids being added were randomly generated
 
602
            and no check for existence will be performed.
 
603
        """
 
604
        if not self._add_callback:
 
605
            raise errors.ReadOnlyError(self)
 
606
        # we hope there are no repositories with inconsistent parentage
 
607
        # anymore.
 
608
 
 
609
        changed = False
 
610
        keys = {}
 
611
        for (key, value, refs) in records:
 
612
            if not self._parents:
 
613
                if refs:
 
614
                    for ref in refs:
 
615
                        if ref:
 
616
                            raise KnitCorrupt(self,
 
617
                                "attempt to add node with parents "
 
618
                                "in parentless index.")
 
619
                    refs = ()
 
620
                    changed = True
 
621
            keys[key] = (value, refs)
 
622
        # check for dups
 
623
        if not random_id:
 
624
            present_nodes = self._get_entries(keys)
 
625
            for (index, key, value, node_refs) in present_nodes:
 
626
                if node_refs != keys[key][1]:
 
627
                    raise errors.KnitCorrupt(self, "inconsistent details in add_records"
 
628
                        ": %s %s" % ((value, node_refs), keys[key]))
 
629
                del keys[key]
 
630
                changed = True
 
631
        if changed:
 
632
            result = []
 
633
            if self._parents:
 
634
                for key, (value, node_refs) in keys.iteritems():
 
635
                    result.append((key, value, node_refs))
 
636
            else:
 
637
                for key, (value, node_refs) in keys.iteritems():
 
638
                    result.append((key, value))
 
639
            records = result
 
640
        self._add_callback(records)
 
641
        
 
642
    def _check_read(self):
 
643
        """raise if reads are not permitted."""
 
644
        if not self._is_locked():
 
645
            raise errors.ObjectNotLocked(self)
 
646
 
397
647
    def _check_write_ok(self):
398
648
        """Assert if writes are not permitted."""
399
649
        if not self._is_locked():
400
650
            raise errors.ObjectNotLocked(self)
401
651
 
 
652
    def _get_entries(self, keys, check_present=False):
 
653
        """Get the entries for keys.
 
654
        
 
655
        :param keys: An iterable of index key tuples.
 
656
        """
 
657
        keys = set(keys)
 
658
        found_keys = set()
 
659
        if self._parents:
 
660
            for node in self._graph_index.iter_entries(keys):
 
661
                yield node
 
662
                found_keys.add(node[1])
 
663
        else:
 
664
            # adapt parentless index to the rest of the code.
 
665
            for node in self._graph_index.iter_entries(keys):
 
666
                yield node[0], node[1], node[2], ()
 
667
                found_keys.add(node[1])
 
668
        if check_present:
 
669
            missing_keys = keys.difference(found_keys)
 
670
            if missing_keys:
 
671
                raise RevisionNotPresent(missing_keys.pop(), self)
 
672
 
 
673
    def get_parent_map(self, keys):
 
674
        """Get a map of the parents of keys.
 
675
 
 
676
        :param keys: The keys to look up parents for.
 
677
        :return: A mapping from keys to parents. Absent keys are absent from
 
678
            the mapping.
 
679
        """
 
680
        self._check_read()
 
681
        nodes = self._get_entries(keys)
 
682
        result = {}
 
683
        if self._parents:
 
684
            for node in nodes:
 
685
                result[node[1]] = node[3][0]
 
686
        else:
 
687
            for node in nodes:
 
688
                result[node[1]] = None
 
689
        return result
 
690
 
 
691
    def get_build_details(self, keys):
 
692
        """Get the various build details for keys.
 
693
 
 
694
        Ghosts are omitted from the result.
 
695
 
 
696
        :param keys: An iterable of keys.
 
697
        :return: A dict of key:
 
698
            (index_memo, compression_parent, parents, record_details).
 
699
            index_memo
 
700
                opaque structure to pass to read_records to extract the raw
 
701
                data
 
702
            compression_parent
 
703
                Content that this record is built upon, may be None
 
704
            parents
 
705
                Logical parents of this node
 
706
            record_details
 
707
                extra information about the content which needs to be passed to
 
708
                Factory.parse_record
 
709
        """
 
710
        self._check_read()
 
711
        result = {}
 
712
        entries = self._get_entries(keys, False)
 
713
        for entry in entries:
 
714
            key = entry[1]
 
715
            if not self._parents:
 
716
                parents = None
 
717
            else:
 
718
                parents = entry[3][0]
 
719
            value = entry[2]
 
720
            method = 'group'
 
721
            result[key] = (self._node_to_position(entry),
 
722
                                  None, parents, (method, None))
 
723
        return result
 
724
    
 
725
    def keys(self):
 
726
        """Get all the keys in the collection.
 
727
        
 
728
        The keys are not ordered.
 
729
        """
 
730
        self._check_read()
 
731
        return [node[1] for node in self._graph_index.iter_all_entries()]
 
732
    
 
733
    def _node_to_position(self, node):
 
734
        """Convert an index value to position details."""
 
735
        bits = node[2].split(' ')
 
736
        # It would be nice not to read the entire gzip.
 
737
        start = int(bits[0])
 
738
        stop = int(bits[1])
 
739
        basis_end = int(bits[2])
 
740
        delta_end = int(bits[3])
 
741
        return node[0], start, stop, basis_end, delta_end