~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2006-03-09 06:39:13 UTC
  • mfrom: (1596.2.6 integration)
  • Revision ID: pqm@pqm.ubuntu.com-20060309063913-6d8ce700706d0802
Merge knit performance stage 1.

Show diffs side-by-side

added added

removed removed

Lines of Context:
63
63
# record content length ?
64
64
                  
65
65
 
 
66
from copy import copy
66
67
from cStringIO import StringIO
67
68
import difflib
68
69
from difflib import SequenceMatcher
69
70
from gzip import GzipFile
 
71
from itertools import izip
70
72
import os
71
73
 
 
74
 
 
75
import bzrlib
72
76
import bzrlib.errors as errors
73
77
from bzrlib.errors import FileExists, NoSuchFile, KnitError, \
74
78
        InvalidRevisionId, KnitCorrupt, KnitHeaderError, \
239
243
        """
240
244
        if access_mode is None:
241
245
            access_mode = 'w'
 
246
        super(KnitVersionedFile, self).__init__(access_mode)
242
247
        assert access_mode in ('r', 'w'), "invalid mode specified %r" % access_mode
243
248
        assert not basis_knit or isinstance(basis_knit, KnitVersionedFile), \
244
249
            type(basis_knit)
255
260
        self._data = _KnitData(transport, relpath + DATA_SUFFIX,
256
261
            access_mode, create=not len(self.versions()))
257
262
 
 
263
    def clear_cache(self):
 
264
        """Clear the data cache only."""
 
265
        self._data.clear_cache()
 
266
 
258
267
    def copy_to(self, name, transport):
259
268
        """See VersionedFile.copy_to()."""
260
269
        # copy the current index to a temp index to avoid racing with local
268
277
    def create_empty(self, name, transport, mode=None):
269
278
        return KnitVersionedFile(name, transport, factory=self.factory, delta=self.delta, create=True)
270
279
    
 
280
    def _fix_parents(self, version, new_parents):
 
281
        """Fix the parents list for version.
 
282
        
 
283
        This is done by appending a new version to the index
 
284
        with identical data except for the parents list.
 
285
        the parents list must be a superset of the current
 
286
        list.
 
287
        """
 
288
        current_values = self._index._cache[version]
 
289
        assert set(current_values[4]).difference(set(new_parents)) == set()
 
290
        self._index.add_version(version,
 
291
                                current_values[1], 
 
292
                                current_values[2],
 
293
                                current_values[3],
 
294
                                new_parents)
 
295
 
 
296
    def get_graph_with_ghosts(self):
 
297
        """See VersionedFile.get_graph_with_ghosts()."""
 
298
        graph_items = self._index.get_graph()
 
299
        return dict(graph_items)
 
300
 
271
301
    @staticmethod
272
302
    def get_suffixes():
273
303
        """See VersionedFile.get_suffixes()."""
274
304
        return [DATA_SUFFIX, INDEX_SUFFIX]
275
305
 
 
306
    def has_ghost(self, version_id):
 
307
        """True if there is a ghost reference in the file to version_id."""
 
308
        # maybe we have it
 
309
        if self.has_version(version_id):
 
310
            return False
 
311
        # optimisable if needed by memoising the _ghosts set.
 
312
        items = self._index.get_graph()
 
313
        for node, parents in items:
 
314
            for parent in parents:
 
315
                if parent not in self._index._cache:
 
316
                    if parent == version_id:
 
317
                        return True
 
318
        return False
 
319
 
276
320
    def versions(self):
277
321
        """See VersionedFile.versions."""
278
322
        return self._index.get_versions()
389
433
        if version_ids:
390
434
            raise RevisionNotPresent(list(version_ids)[0], self.filename)
391
435
 
392
 
    def add_lines(self, version_id, parents, lines):
 
436
    def _add_lines_with_ghosts(self, version_id, parents, lines):
 
437
        """See VersionedFile.add_lines_with_ghosts()."""
 
438
        self._check_add(version_id, lines)
 
439
        return self._add(version_id, lines[:], parents, self.delta)
 
440
 
 
441
    def _add_lines(self, version_id, parents, lines):
393
442
        """See VersionedFile.add_lines."""
 
443
        self._check_add(version_id, lines)
 
444
        self._check_versions_present(parents)
 
445
        return self._add(version_id, lines[:], parents, self.delta)
 
446
 
 
447
    def _check_add(self, version_id, lines):
 
448
        """check that version_id and lines are safe to add."""
394
449
        assert self.writable, "knit is not opened for write"
395
450
        ### FIXME escape. RBC 20060228
396
451
        if contains_whitespace(version_id):
398
453
        if self.has_version(version_id):
399
454
            raise RevisionAlreadyPresent(version_id, self.filename)
400
455
 
401
 
        if True or __debug__:
 
456
        if False or __debug__:
402
457
            for l in lines:
403
458
                assert '\n' not in l[:-1]
404
459
 
405
 
        self._check_versions_present(parents)
406
 
        return self._add(version_id, lines[:], parents, self.delta)
407
 
 
408
460
    def _add(self, version_id, lines, parents, delta):
409
461
        """Add a set of lines on top of version specified by parents.
410
462
 
411
463
        If delta is true, compress the text as a line-delta against
412
464
        the first parent.
 
465
 
 
466
        Any versions not present will be converted into ghosts.
413
467
        """
414
 
        if delta and not parents:
 
468
        ghostless_parents = []
 
469
        ghosts = []
 
470
        for parent in parents:
 
471
            if not self.has_version(parent):
 
472
                ghosts.append(parent)
 
473
            else:
 
474
                ghostless_parents.append(parent)
 
475
 
 
476
        if delta and not len(ghostless_parents):
415
477
            delta = False
416
478
 
417
479
        digest = sha_strings(lines)
422
484
                lines[-1] = lines[-1] + '\n'
423
485
 
424
486
        lines = self.factory.make(lines, len(self._index))
425
 
        if self.factory.annotated and len(parents) > 0:
 
487
        if self.factory.annotated and len(ghostless_parents) > 0:
426
488
            # Merge annotations from parent texts if so is needed.
427
 
            self._merge_annotations(lines, parents)
 
489
            self._merge_annotations(lines, ghostless_parents)
428
490
 
429
 
        if parents and delta:
 
491
        if len(ghostless_parents) and delta:
430
492
            # To speed the extract of texts the delta chain is limited
431
493
            # to a fixed number of deltas.  This should minimize both
432
494
            # I/O and the time spend applying deltas.
433
495
            count = 0
434
 
            delta_parents = parents
 
496
            delta_parents = ghostless_parents
435
497
            while count < 25:
436
498
                parent = delta_parents[0]
437
499
                method = self._index.get_method(parent)
444
506
 
445
507
        if delta:
446
508
            options.append('line-delta')
447
 
            content = self._get_content(parents[0])
 
509
            content = self._get_content(ghostless_parents[0])
448
510
            delta_hunks = content.line_delta(lines)
449
511
            store_lines = self.factory.lower_line_delta(delta_hunks)
450
512
        else:
457
519
    def check(self, progress_bar=None):
458
520
        """See VersionedFile.check()."""
459
521
 
460
 
    def clone_text(self, new_version_id, old_version_id, parents):
 
522
    def _clone_text(self, new_version_id, old_version_id, parents):
461
523
        """See VersionedFile.clone_text()."""
462
524
        # FIXME RBC 20060228 make fast by only inserting an index with null delta.
463
525
        self.add_lines(new_version_id, parents, self.get_lines(old_version_id))
466
528
        """See VersionedFile.get_lines()."""
467
529
        return self._get_content(version_id).text()
468
530
 
 
531
    def iter_lines_added_or_present_in_versions(self, version_ids=None):
 
532
        """See VersionedFile.iter_lines_added_or_present_in_versions()."""
 
533
        if version_ids is None:
 
534
            version_ids = self.versions()
 
535
        # we dont care about inclusions, the caller cares.
 
536
        # but we need to setup a list of records to visit.
 
537
        # we need version_id, position, length
 
538
        version_id_records = []
 
539
        requested_versions = list(version_ids)
 
540
        # filter for available versions
 
541
        for version_id in requested_versions:
 
542
            if not self.has_version(version_id):
 
543
                raise RevisionNotPresent(version_id, self.filename)
 
544
        # get a in-component-order queue:
 
545
        version_ids = []
 
546
        for version_id in self.versions():
 
547
            if version_id in requested_versions:
 
548
                version_ids.append(version_id)
 
549
                data_pos, length = self._index.get_position(version_id)
 
550
                version_id_records.append((version_id, data_pos, length))
 
551
 
 
552
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
553
        count = 0
 
554
        total = len(version_id_records)
 
555
        try:
 
556
            pb.update('Walking content.', count, total)
 
557
            for version_id, data, sha_value in \
 
558
                self._data.read_records_iter(version_id_records):
 
559
                pb.update('Walking content.', count, total)
 
560
                method = self._index.get_method(version_id)
 
561
                version_idx = self._index.lookup(version_id)
 
562
                assert method in ('fulltext', 'line-delta')
 
563
                if method == 'fulltext':
 
564
                    content = self.factory.parse_fulltext(data, version_idx)
 
565
                    for line in content.text():
 
566
                        yield line
 
567
                else:
 
568
                    delta = self.factory.parse_line_delta(data, version_idx)
 
569
                    for start, end, count, lines in delta:
 
570
                        for origin, line in lines:
 
571
                            yield line
 
572
                count +=1
 
573
            pb.update('Walking content.', total, total)
 
574
            pb.finished()
 
575
        except:
 
576
            pb.update('Walking content.', total, total)
 
577
            pb.finished()
 
578
            raise
 
579
        
469
580
    def num_versions(self):
470
581
        """See VersionedFile.num_versions()."""
471
582
        return self._index.num_versions()
483
594
        self._check_versions_present([version_id])
484
595
        return list(self._index.get_parents(version_id))
485
596
 
 
597
    def get_parents_with_ghosts(self, version_id):
 
598
        """See VersionedFile.get_parents."""
 
599
        self._check_versions_present([version_id])
 
600
        return list(self._index.get_parents_with_ghosts(version_id))
 
601
 
486
602
    def get_ancestry(self, versions):
487
603
        """See VersionedFile.get_ancestry."""
488
604
        if isinstance(versions, basestring):
492
608
        self._check_versions_present(versions)
493
609
        return self._index.get_ancestry(versions)
494
610
 
 
611
    def get_ancestry_with_ghosts(self, versions):
 
612
        """See VersionedFile.get_ancestry_with_ghosts."""
 
613
        if isinstance(versions, basestring):
 
614
            versions = [versions]
 
615
        if not versions:
 
616
            return []
 
617
        self._check_versions_present(versions)
 
618
        return self._index.get_ancestry_with_ghosts(versions)
 
619
 
495
620
    def _reannotate_line_delta(self, other, lines, new_version_id,
496
621
                               new_version_idx):
497
622
        """Re-annotate line-delta and return new delta."""
525
650
 
526
651
        return self.factory.lower_fulltext(KnitContent(new_lines))
527
652
 
 
653
    #@deprecated_method(zero_eight)
528
654
    def walk(self, version_ids):
529
655
        """See VersionedFile.walk."""
530
656
        # We take the short path here, and extract all relevant texts
604
730
            self._history.append(version_id)
605
731
 
606
732
    def _iter_index(self, fp):
607
 
        lines = fp.read()
608
 
        for l in lines.splitlines(False):
 
733
        l = fp.readline()
 
734
        while l != '':
609
735
            yield l.split()
 
736
            l = fp.readline()
 
737
        #lines = fp.read()
 
738
        #for l in lines.splitlines(False):
 
739
        #    yield l.split()
610
740
 
611
741
    def __init__(self, transport, filename, mode, create=False):
612
742
        _KnitComponentFile.__init__(self, transport, filename, mode)
616
746
        # so - wc -l of a knit index is != the number of uniqe names
617
747
        # in the weave.
618
748
        self._history = []
 
749
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
619
750
        try:
620
 
            fp = self._transport.get(self._filename)
621
 
            self.check_header(fp)
622
 
            for rec in self._iter_index(fp):
623
 
                self._cache_version(rec[0], rec[1].split(','), int(rec[2]), int(rec[3]),
624
 
                    [self._history[int(i)] for i in rec[4:]])
625
 
        except NoSuchFile, e:
626
 
            if mode != 'w' or not create:
627
 
                raise
628
 
            self.write_header()
 
751
            count = 0
 
752
            total = 1
 
753
            try:
 
754
                pb.update('read knit index', count, total)
 
755
                fp = self._transport.get(self._filename)
 
756
                self.check_header(fp)
 
757
                for rec in self._iter_index(fp):
 
758
                    count += 1
 
759
                    total += 1
 
760
                    pb.update('read knit index', count, total)
 
761
                    parents = self._parse_parents(rec[4:])
 
762
                    self._cache_version(rec[0], rec[1].split(','), int(rec[2]), int(rec[3]),
 
763
                        parents)
 
764
            except NoSuchFile, e:
 
765
                if mode != 'w' or not create:
 
766
                    raise
 
767
                self.write_header()
 
768
        finally:
 
769
            pb.update('read knit index', total, total)
 
770
            pb.finished()
 
771
 
 
772
    def _parse_parents(self, compressed_parents):
 
773
        """convert a list of string parent values into version ids.
 
774
 
 
775
        ints are looked up in the index.
 
776
        .FOO values are ghosts and converted in to FOO.
 
777
        """
 
778
        result = []
 
779
        for value in compressed_parents:
 
780
            if value.startswith('.'):
 
781
                result.append(value[1:])
 
782
            else:
 
783
                assert isinstance(value, str)
 
784
                result.append(self._history[int(value)])
 
785
        return result
629
786
 
630
787
    def get_graph(self):
631
788
        graph = []
641
798
        while len(pending):
642
799
            version = pending.pop()
643
800
            parents = self._cache[version][4]
 
801
            # got the parents ok
 
802
            # trim ghosts
 
803
            parents = [parent for parent in parents if parent in self._cache]
644
804
            for parent in parents:
 
805
                # if not completed and not a ghost
645
806
                if parent not in graph:
646
807
                    pending.add(parent)
647
808
            graph[version] = parents
648
809
        return topo_sort(graph.items())
649
810
 
 
811
    def get_ancestry_with_ghosts(self, versions):
 
812
        """See VersionedFile.get_ancestry_with_ghosts."""
 
813
        # get a graph of all the mentioned versions:
 
814
        graph = {}
 
815
        pending = set(versions)
 
816
        while len(pending):
 
817
            version = pending.pop()
 
818
            try:
 
819
                parents = self._cache[version][4]
 
820
            except KeyError:
 
821
                # ghost, fake it
 
822
                graph[version] = []
 
823
                pass
 
824
            else:
 
825
                # got the parents ok
 
826
                for parent in parents:
 
827
                    if parent not in graph:
 
828
                        pending.add(parent)
 
829
                graph[version] = parents
 
830
        return topo_sort(graph.items())
 
831
 
650
832
    def num_versions(self):
651
833
        return len(self._history)
652
834
 
662
844
        assert version_id in self._cache
663
845
        return self._history.index(version_id)
664
846
 
 
847
    def _version_list_to_index(self, versions):
 
848
        result_list = []
 
849
        for version in versions:
 
850
            if version in self._cache:
 
851
                result_list.append(str(self._history.index(version)))
 
852
            else:
 
853
                result_list.append('.' + version.encode('utf-8'))
 
854
        return ' '.join(result_list)
 
855
 
665
856
    def add_version(self, version_id, options, pos, size, parents):
666
857
        """Add a version record to the index."""
667
858
        self._cache_version(version_id, options, pos, size, parents)
670
861
                                        ','.join(options),
671
862
                                        pos,
672
863
                                        size,
673
 
                                        ' '.join([str(self.lookup(vid)) for 
674
 
                                                  vid in parents]))
 
864
                                        self._version_list_to_index(parents))
675
865
        self._transport.append(self._filename, StringIO(content))
676
866
 
677
867
    def has_version(self, version_id):
696
886
        return self._cache[version_id][1]
697
887
 
698
888
    def get_parents(self, version_id):
699
 
        """Return parents of specified version."""
700
 
        return self._cache[version_id][4]
 
889
        """Return parents of specified version ignoring ghosts."""
 
890
        return [parent for parent in self._cache[version_id][4] 
 
891
                if parent in self._cache]
 
892
 
 
893
    def get_parents_with_ghosts(self, version_id):
 
894
        """Return parents of specified version wth ghosts."""
 
895
        return self._cache[version_id][4] 
701
896
 
702
897
    def check_versions_present(self, version_ids):
703
898
        """Check that all specified versions are present."""
720
915
        self._checked = False
721
916
        if create:
722
917
            self._transport.put(self._filename, StringIO(''))
 
918
        self._records = {}
 
919
 
 
920
    def clear_cache(self):
 
921
        """Clear the record cache."""
 
922
        self._records = {}
723
923
 
724
924
    def _open_file(self):
725
925
        if self._file is None:
739
939
        print >>data_file, "end %s\n" % version_id
740
940
        data_file.close()
741
941
 
 
942
        # cache
 
943
        self._records[version_id] = (digest, lines)
 
944
 
742
945
        content = sio.getvalue()
743
 
        start_pos = self._transport.append(self._filename, StringIO(content))
 
946
        sio.seek(0)
 
947
        start_pos = self._transport.append(self._filename, sio)
744
948
        return start_pos, len(content)
745
949
 
746
950
    def _parse_record(self, version_id, data):
749
953
        if len(rec) != 4:
750
954
            raise KnitCorrupt(self._filename, 'unexpected number of records')
751
955
        if rec[1] != version_id:
752
 
            raise KnitCorrupt(self.file.name, 
753
 
                              'unexpected version, wanted %r' % version_id)
 
956
            raise KnitCorrupt(self._filename, 
 
957
                              'unexpected version, wanted %r, got %r' % (
 
958
                                version_id, rec[1]))
754
959
        lines = int(rec[2])
755
960
        record_contents = self._read_record_contents(df, lines)
756
961
        l = df.readline()
774
979
        contents, digest).
775
980
        """
776
981
 
777
 
        class ContinuousRange:
778
 
            def __init__(self, rec_id, pos, size):
779
 
                self.start_pos = pos
780
 
                self.end_pos = pos + size
781
 
                self.versions = [(rec_id, pos, size)]
782
 
 
783
 
            def add(self, rec_id, pos, size):
784
 
                if self.end_pos != pos:
785
 
                    return False
786
 
                self.end_pos = pos + size
787
 
                self.versions.append((rec_id, pos, size))
788
 
                return True
789
 
 
790
 
            def split(self, fp):
791
 
                for rec_id, pos, size in self.versions:
792
 
                    yield rec_id, fp.read(size)
793
 
 
794
 
        fp = self._open_file()
795
 
 
796
 
        # Loop through all records and try to collect as large
797
 
        # continuous region as possible to read.
798
 
        while records:
799
 
            record_id, pos, size = records.pop(0)
800
 
            continuous_range = ContinuousRange(record_id, pos, size)
801
 
            while records:
802
 
                record_id, pos, size = records[0]
803
 
                if continuous_range.add(record_id, pos, size):
804
 
                    del records[0]
805
 
                else:
806
 
                    break
807
 
            fp.seek(continuous_range.start_pos, 0)
808
 
            for record_id, data in continuous_range.split(fp):
 
982
        needed_records = []
 
983
        for version_id, pos, size in records:
 
984
            if version_id not in self._records:
 
985
                needed_records.append((version_id, pos, size))
 
986
 
 
987
        if len(needed_records):
 
988
            # We take it that the transport optimizes the fetching as good
 
989
            # as possible (ie, reads continous ranges.)
 
990
            response = self._transport.readv(self._filename,
 
991
                [(pos, size) for version_id, pos, size in needed_records])
 
992
 
 
993
            for (record_id, pos, size), (pos, data) in izip(iter(needed_records), response):
809
994
                content, digest = self._parse_record(record_id, data)
810
 
                yield record_id, content, digest
811
 
 
812
 
        self._file = None
 
995
                self._records[record_id] = (digest, content)
 
996
    
 
997
        for version_id, pos, size in records:
 
998
            yield version_id, copy(self._records[version_id][1]), copy(self._records[version_id][0])
813
999
 
814
1000
    def read_records(self, records):
815
1001
        """Read records into a dictionary."""
850
1036
        if not version_ids:
851
1037
            return 0
852
1038
 
853
 
        if pb is None:
854
 
            from bzrlib.progress import DummyProgress
855
 
            pb = DummyProgress()
856
 
 
857
 
        version_ids = list(version_ids)
858
 
        if None in version_ids:
859
 
            version_ids.remove(None)
860
 
 
861
 
        self.source_ancestry = set(self.source.get_ancestry(version_ids))
862
 
        this_versions = set(self.target._index.get_versions())
863
 
        needed_versions = self.source_ancestry - this_versions
864
 
        cross_check_versions = self.source_ancestry.intersection(this_versions)
865
 
        mismatched_versions = set()
866
 
        for version in cross_check_versions:
867
 
            # scan to include needed parents.
868
 
            n1 = set(self.target.get_parents(version))
869
 
            n2 = set(self.source.get_parents(version))
870
 
            if n1 != n2:
871
 
                # FIXME TEST this check for cycles being introduced works
872
 
                # the logic is we have a cycle if in our graph we are an
873
 
                # ancestor of any of the n2 revisions.
874
 
                for parent in n2:
875
 
                    if parent in n1:
876
 
                        # safe
877
 
                        continue
878
 
                    else:
879
 
                        parent_ancestors = self.source.get_ancestry(parent)
880
 
                        if version in parent_ancestors:
881
 
                            raise errors.GraphCycleError([parent, version])
882
 
                # ensure this parent will be available later.
883
 
                new_parents = n2.difference(n1)
884
 
                needed_versions.update(new_parents.difference(this_versions))
885
 
                mismatched_versions.add(version)
886
 
 
887
 
        if not needed_versions and not cross_check_versions:
888
 
            return 0
889
 
        full_list = topo_sort(self.source._index.get_graph())
890
 
 
891
 
        version_list = [i for i in full_list if (not self.target.has_version(i)
892
 
                        and i in needed_versions)]
893
 
 
894
 
        records = []
895
 
        for version_id in version_list:
896
 
            data_pos, data_size = self.source._index.get_position(version_id)
897
 
            records.append((version_id, data_pos, data_size))
898
 
 
899
 
        count = 0
900
 
        for version_id, lines, digest \
901
 
                in self.source._data.read_records_iter(records):
902
 
            options = self.source._index.get_options(version_id)
903
 
            parents = self.source._index.get_parents(version_id)
904
 
            
905
 
            for parent in parents:
906
 
                assert self.target.has_version(parent)
907
 
 
908
 
            if self.target.factory.annotated:
909
 
                # FIXME jrydberg: it should be possible to skip
910
 
                # re-annotating components if we know that we are
911
 
                # going to pull all revisions in the same order.
912
 
                new_version_id = version_id
913
 
                new_version_idx = self.target._index.num_versions()
914
 
                if 'fulltext' in options:
915
 
                    lines = self.target._reannotate_fulltext(self.source, lines,
916
 
                        new_version_id, new_version_idx)
917
 
                elif 'line-delta' in options:
918
 
                    lines = self.target._reannotate_line_delta(self.source, lines,
919
 
                        new_version_id, new_version_idx)
920
 
 
921
 
            count = count + 1
922
 
            pb.update("Joining knit", count, len(version_list))
923
 
 
924
 
            pos, size = self.target._data.add_record(version_id, digest, lines)
925
 
            self.target._index.add_version(version_id, options, pos, size, parents)
926
 
 
927
 
        for version in mismatched_versions:
928
 
            n1 = set(self.target.get_parents(version))
929
 
            n2 = set(self.source.get_parents(version))
930
 
            # write a combined record to our history.
931
 
            new_parents = self.target.get_parents(version) + list(n2.difference(n1))
932
 
            current_values = self.target._index._cache[version]
933
 
            self.target._index.add_version(version,
934
 
                                    current_values[1], 
935
 
                                    current_values[2],
936
 
                                    current_values[3],
937
 
                                    new_parents)
938
 
        pb.clear()
939
 
        return count
 
1039
        pb = bzrlib.ui.ui_factory.nested_progress_bar()
 
1040
        try:
 
1041
            version_ids = list(version_ids)
 
1042
            if None in version_ids:
 
1043
                version_ids.remove(None)
 
1044
    
 
1045
            self.source_ancestry = set(self.source.get_ancestry(version_ids))
 
1046
            this_versions = set(self.target._index.get_versions())
 
1047
            needed_versions = self.source_ancestry - this_versions
 
1048
            cross_check_versions = self.source_ancestry.intersection(this_versions)
 
1049
            mismatched_versions = set()
 
1050
            for version in cross_check_versions:
 
1051
                # scan to include needed parents.
 
1052
                n1 = set(self.target.get_parents_with_ghosts(version))
 
1053
                n2 = set(self.source.get_parents_with_ghosts(version))
 
1054
                if n1 != n2:
 
1055
                    # FIXME TEST this check for cycles being introduced works
 
1056
                    # the logic is we have a cycle if in our graph we are an
 
1057
                    # ancestor of any of the n2 revisions.
 
1058
                    for parent in n2:
 
1059
                        if parent in n1:
 
1060
                            # safe
 
1061
                            continue
 
1062
                        else:
 
1063
                            parent_ancestors = self.source.get_ancestry(parent)
 
1064
                            if version in parent_ancestors:
 
1065
                                raise errors.GraphCycleError([parent, version])
 
1066
                    # ensure this parent will be available later.
 
1067
                    new_parents = n2.difference(n1)
 
1068
                    needed_versions.update(new_parents.difference(this_versions))
 
1069
                    mismatched_versions.add(version)
 
1070
    
 
1071
            if not needed_versions and not cross_check_versions:
 
1072
                return 0
 
1073
            full_list = topo_sort(self.source.get_graph())
 
1074
    
 
1075
            version_list = [i for i in full_list if (not self.target.has_version(i)
 
1076
                            and i in needed_versions)]
 
1077
    
 
1078
            records = []
 
1079
            for version_id in version_list:
 
1080
                data_pos, data_size = self.source._index.get_position(version_id)
 
1081
                records.append((version_id, data_pos, data_size))
 
1082
    
 
1083
            count = 0
 
1084
            for version_id, lines, digest \
 
1085
                    in self.source._data.read_records_iter(records):
 
1086
                options = self.source._index.get_options(version_id)
 
1087
                parents = self.source._index.get_parents_with_ghosts(version_id)
 
1088
                
 
1089
                for parent in parents:
 
1090
                    # if source has the parent, we must hav grabbed it first.
 
1091
                    assert (self.target.has_version(parent) or not
 
1092
                            self.source.has_version(parent))
 
1093
    
 
1094
                if self.target.factory.annotated:
 
1095
                    # FIXME jrydberg: it should be possible to skip
 
1096
                    # re-annotating components if we know that we are
 
1097
                    # going to pull all revisions in the same order.
 
1098
                    new_version_id = version_id
 
1099
                    new_version_idx = self.target._index.num_versions()
 
1100
                    if 'fulltext' in options:
 
1101
                        lines = self.target._reannotate_fulltext(self.source, lines,
 
1102
                            new_version_id, new_version_idx)
 
1103
                    elif 'line-delta' in options:
 
1104
                        lines = self.target._reannotate_line_delta(self.source, lines,
 
1105
                            new_version_id, new_version_idx)
 
1106
    
 
1107
                count = count + 1
 
1108
                pb.update("Joining knit", count, len(version_list))
 
1109
    
 
1110
                pos, size = self.target._data.add_record(version_id, digest, lines)
 
1111
                self.target._index.add_version(version_id, options, pos, size, parents)
 
1112
    
 
1113
            for version in mismatched_versions:
 
1114
                n1 = set(self.target.get_parents_with_ghosts(version))
 
1115
                n2 = set(self.source.get_parents_with_ghosts(version))
 
1116
                # write a combined record to our history preserving the current 
 
1117
                # parents as first in the list
 
1118
                new_parents = self.target.get_parents_with_ghosts(version) + list(n2.difference(n1))
 
1119
                self.target.fix_parents(version, new_parents)
 
1120
            return count
 
1121
        finally:
 
1122
            pb.clear()
 
1123
            pb.finished()
940
1124
 
941
1125
 
942
1126
InterVersionedFile.register_optimiser(InterKnit)