~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/knit.py

merge bzr.dev

Show diffs side-by-side

added added

removed removed

Lines of Context:
74
74
lazy_import(globals(), """
75
75
from bzrlib import (
76
76
    annotate,
 
77
    lru_cache,
77
78
    pack,
78
79
    trace,
79
80
    )
575
576
        """Get a data stream for the specified versions.
576
577
 
577
578
        Versions may be returned in any order, not necessarily the order
578
 
        specified.
 
579
        specified.  They are returned in a partial order by compression
 
580
        parent, so that the deltas can be applied as the data stream is
 
581
        inserted; however note that compression parents will not be sent
 
582
        unless they were specifically requested, as the client may already
 
583
        have them.
579
584
 
580
585
        :param required_versions: The exact set of versions to be extracted.
581
586
            Unlike some other knit methods, this is not used to generate a
584
589
        :returns: format_signature, list of (version, options, length, parents),
585
590
            reader_callable.
586
591
        """
587
 
        if not isinstance(required_versions, set):
588
 
            required_versions = set(required_versions)
589
 
        # we don't care about inclusions, the caller cares.
590
 
        # but we need to setup a list of records to visit.
 
592
        required_version_set = frozenset(required_versions)
 
593
        version_index = {}
 
594
        # list of revisions that can just be sent without waiting for their
 
595
        # compression parent
 
596
        ready_to_send = []
 
597
        # map from revision to the children based on it
 
598
        deferred = {}
 
599
        # first, read all relevant index data, enough to sort into the right
 
600
        # order to return
591
601
        for version_id in required_versions:
592
602
            if not self.has_version(version_id):
593
603
                raise RevisionNotPresent(version_id, self.filename)
594
 
        # Pick the desired versions out of the index in oldest-to-newest order
595
 
        version_list = []
596
 
        for version_id in self.versions():
597
 
            if version_id in required_versions:
598
 
                version_list.append(version_id)
599
 
 
600
 
        # create the list of version information for the result
601
 
        copy_queue_records = []
602
 
        copy_set = set()
603
 
        result_version_list = []
604
 
        for version_id in version_list:
605
604
            options = self._index.get_options(version_id)
606
605
            parents = self._index.get_parents_with_ghosts(version_id)
607
606
            index_memo = self._index.get_position(version_id)
 
607
            version_index[version_id] = (index_memo, options, parents)
 
608
            if parents and parents[0] in required_version_set:
 
609
                # must wait until the parent has been sent
 
610
                deferred.setdefault(parents[0], []). \
 
611
                    append(version_id)
 
612
            else:
 
613
                # either a fulltext, or a delta whose parent the client did
 
614
                # not ask for and presumably already has
 
615
                ready_to_send.append(version_id)
 
616
        # build a list of results to return, plus instructions for data to
 
617
        # read from the file
 
618
        copy_queue_records = []
 
619
        temp_version_list = []
 
620
        while ready_to_send:
 
621
            # XXX: pushing and popping lists may be a bit inefficient
 
622
            version_id = ready_to_send.pop(0)
 
623
            (index_memo, options, parents) = version_index[version_id]
608
624
            copy_queue_records.append((version_id, index_memo))
609
625
            none, data_pos, data_size = index_memo
610
 
            copy_set.add(version_id)
611
 
            # version, options, length, parents
612
 
            result_version_list.append((version_id, options, data_size,
 
626
            temp_version_list.append((version_id, options, data_size,
613
627
                parents))
614
 
 
615
 
        # Read the compressed record data.
616
 
        # XXX:
617
 
        # From here down to the return should really be logic in the returned
618
 
        # callable -- in a class that adapts read_records_iter_raw to read
619
 
        # requests.
 
628
            if version_id in deferred:
 
629
                # now we can send all the children of this revision - we could
 
630
                # put them in anywhere, but we hope that sending them soon
 
631
                # after the fulltext will give good locality in the receiver
 
632
                ready_to_send[:0] = deferred.pop(version_id)
 
633
        assert len(deferred) == 0, \
 
634
            "Still have compressed child versions waiting to be sent"
 
635
        # XXX: The stream format is such that we cannot stream it - we have to
 
636
        # know the length of all the data a-priori.
620
637
        raw_datum = []
 
638
        result_version_list = []
621
639
        for (version_id, raw_data), \
622
640
            (version_id2, options, _, parents) in \
623
641
            izip(self._data.read_records_iter_raw(copy_queue_records),
624
 
                 result_version_list):
625
 
            assert version_id == version_id2, 'logic error, inconsistent results'
 
642
                 temp_version_list):
 
643
            assert version_id == version_id2, \
 
644
                'logic error, inconsistent results'
626
645
            raw_datum.append(raw_data)
 
646
            result_version_list.append(
 
647
                (version_id, options, len(raw_data), parents))
 
648
        # provide a callback to get data incrementally.
627
649
        pseudo_file = StringIO(''.join(raw_datum))
628
650
        def read(length):
629
651
            if length is None:
1117
1139
                line_iterator = self.factory.get_fulltext_content(data)
1118
1140
            else:
1119
1141
                line_iterator = self.factory.get_linedelta_content(data)
 
1142
            # XXX: It might be more efficient to yield (version_id,
 
1143
            # line_iterator) in the future. However for now, this is a simpler
 
1144
            # change to integrate into the rest of the codebase. RBC 20071110
1120
1145
            for line in line_iterator:
1121
 
                yield line
 
1146
                yield line, version_id
1122
1147
 
1123
1148
        pb.update('Walking content.', total, total)
1124
1149
        
2221
2246
        except AttributeError:
2222
2247
            return False
2223
2248
 
 
2249
    def _copy_texts(self, pb, msg, version_ids, ignore_missing=False):
 
2250
        """Copy texts to the target by extracting and adding them one by one.
 
2251
 
 
2252
        see join() for the parameter definitions.
 
2253
        """
 
2254
        version_ids = self._get_source_version_ids(version_ids, ignore_missing)
 
2255
        graph = self.source.get_graph(version_ids)
 
2256
        order = topo_sort(graph.items())
 
2257
 
 
2258
        def size_of_content(content):
 
2259
            return sum(len(line) for line in content.text())
 
2260
        # Cache at most 10MB of parent texts
 
2261
        parent_cache = lru_cache.LRUSizeCache(max_size=10*1024*1024,
 
2262
                                              compute_size=size_of_content)
 
2263
        # TODO: jam 20071116 It would be nice to have a streaming interface to
 
2264
        #       get multiple texts from a source. The source could be smarter
 
2265
        #       about how it handled intermediate stages.
 
2266
        #       get_line_list() or make_mpdiffs() seem like a possibility, but
 
2267
        #       at the moment they extract all full texts into memory, which
 
2268
        #       causes us to store more than our 3x fulltext goal.
 
2269
        #       Repository.iter_files_bytes() may be another possibility
 
2270
        to_process = [version for version in order
 
2271
                               if version not in self.target]
 
2272
        total = len(to_process)
 
2273
        pb = ui.ui_factory.nested_progress_bar()
 
2274
        try:
 
2275
            for index, version in enumerate(to_process):
 
2276
                pb.update('Converting versioned data', index, total)
 
2277
                sha1, num_bytes, parent_text = self.target.add_lines(version,
 
2278
                    self.source.get_parents(version),
 
2279
                    self.source.get_lines(version),
 
2280
                    parent_texts=parent_cache)
 
2281
                parent_cache[version] = parent_text
 
2282
        finally:
 
2283
            pb.finished()
 
2284
        return total
 
2285
 
2224
2286
    def join(self, pb=None, msg=None, version_ids=None, ignore_missing=False):
2225
2287
        """See InterVersionedFile.join."""
2226
2288
        assert isinstance(self.source, KnitVersionedFile)
2233
2295
        elif self.source.factory.annotated:
2234
2296
            converter = self._anno_to_plain_converter
2235
2297
        else:
2236
 
            # We're converting from a plain to an annotated knit. This requires
2237
 
            # building the annotations from scratch. The generic join code
2238
 
            # handles this implicitly so we delegate to it.
2239
 
            return super(InterKnit, self).join(pb, msg, version_ids,
2240
 
                ignore_missing)
 
2298
            # We're converting from a plain to an annotated knit. Copy them
 
2299
            # across by full texts.
 
2300
            return self._copy_texts(pb, msg, version_ids, ignore_missing)
2241
2301
 
2242
2302
        version_ids = self._get_source_version_ids(version_ids, ignore_missing)
2243
2303
        if not version_ids: