~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/remote.py

(jelmer) Add HPSS call for ``Repository.iter_files_bytes``. (Jelmer Vernooij)

Show diffs side-by-side

added added

removed removed

Lines of Context:
29
29
    graph,
30
30
    lock,
31
31
    lockdir,
 
32
    osutils,
32
33
    registry,
33
34
    repository as _mod_repository,
34
35
    revision as _mod_revision,
1784
1785
 
1785
1786
    @needs_read_lock
1786
1787
    def get_inventory(self, revision_id):
1787
 
        self._ensure_real()
1788
 
        return self._real_repository.get_inventory(revision_id)
 
1788
        return list(self.iter_inventories([revision_id]))[0]
1789
1789
 
1790
1790
    def iter_inventories(self, revision_ids, ordering=None):
1791
1791
        self._ensure_real()
1918
1918
        return self._real_repository._get_versioned_file_checker(
1919
1919
            revisions, revision_versions_cache)
1920
1920
 
 
1921
    def _iter_files_bytes_rpc(self, desired_files, absent):
 
1922
        path = self.bzrdir._path_for_remote_call(self._client)
 
1923
        lines = []
 
1924
        identifiers = []
 
1925
        for (file_id, revid, identifier) in desired_files:
 
1926
            lines.append("%s\0%s" % (
 
1927
                osutils.safe_file_id(file_id),
 
1928
                osutils.safe_revision_id(revid)))
 
1929
            identifiers.append(identifier)
 
1930
        (response_tuple, response_handler) = (
 
1931
            self._call_with_body_bytes_expecting_body(
 
1932
            "Repository.iter_files_bytes", (path, ), "\n".join(lines)))
 
1933
        if response_tuple != ('ok', ):
 
1934
            response_handler.cancel_read_body()
 
1935
            raise errors.UnexpectedSmartServerResponse(response_tuple)
 
1936
        byte_stream = response_handler.read_streamed_body()
 
1937
        def decompress_stream(start, byte_stream, unused):
 
1938
            decompressor = zlib.decompressobj()
 
1939
            yield decompressor.decompress(start)
 
1940
            while decompressor.unused_data == "":
 
1941
                try:
 
1942
                    data = byte_stream.next()
 
1943
                except StopIteration:
 
1944
                    break
 
1945
                yield decompressor.decompress(data)
 
1946
            yield decompressor.flush()
 
1947
            unused.append(decompressor.unused_data)
 
1948
        unused = ""
 
1949
        while True:
 
1950
            while not "\n" in unused:
 
1951
                unused += byte_stream.next()
 
1952
            header, rest = unused.split("\n", 1)
 
1953
            args = header.split("\0")
 
1954
            if args[0] == "absent":
 
1955
                absent[identifiers[int(args[3])]] = (args[1], args[2])
 
1956
                unused = rest
 
1957
                continue
 
1958
            elif args[0] == "ok":
 
1959
                idx = int(args[1])
 
1960
            else:
 
1961
                raise errors.UnexpectedSmartServerResponse(args)
 
1962
            unused_chunks = []
 
1963
            yield (identifiers[idx],
 
1964
                decompress_stream(rest, byte_stream, unused_chunks))
 
1965
            unused = "".join(unused_chunks)
 
1966
 
1921
1967
    def iter_files_bytes(self, desired_files):
1922
1968
        """See Repository.iter_file_bytes.
1923
1969
        """
1924
 
        self._ensure_real()
1925
 
        return self._real_repository.iter_files_bytes(desired_files)
 
1970
        try:
 
1971
            absent = {}
 
1972
            for (identifier, bytes_iterator) in self._iter_files_bytes_rpc(
 
1973
                    desired_files, absent):
 
1974
                yield identifier, bytes_iterator
 
1975
            for fallback in self._fallback_repositories:
 
1976
                if not absent:
 
1977
                    break
 
1978
                desired_files = [(key[0], key[1], identifier) for
 
1979
                    (identifier, key) in absent.iteritems()]
 
1980
                for (identifier, bytes_iterator) in fallback.iter_files_bytes(desired_files):
 
1981
                    del absent[identifier]
 
1982
                    yield identifier, bytes_iterator
 
1983
            if absent:
 
1984
                # There may be more missing items, but raise an exception
 
1985
                # for just one.
 
1986
                missing_identifier = absent.keys()[0]
 
1987
                missing_key = absent[missing_identifier]
 
1988
                raise errors.RevisionNotPresent(revision_id=missing_key[1],
 
1989
                    file_id=missing_key[0])
 
1990
        except errors.UnknownSmartMethod:
 
1991
            self._ensure_real()
 
1992
            for (identifier, bytes_iterator) in (
 
1993
                self._real_repository.iter_files_bytes(desired_files)):
 
1994
                yield identifier, bytes_iterator
1926
1995
 
1927
1996
    def get_cached_parent_map(self, revision_ids):
1928
1997
        """See bzrlib.CachingParentsProvider.get_cached_parent_map"""
2111
2180
 
2112
2181
    @needs_read_lock
2113
2182
    def revision_trees(self, revision_ids):
2114
 
        self._ensure_real()
2115
 
        return self._real_repository.revision_trees(revision_ids)
 
2183
        inventories = self.iter_inventories(revision_ids)
 
2184
        for inv in inventories:
 
2185
            yield InventoryRevisionTree(self, inv, inv.revision_id)
2116
2186
 
2117
2187
    @needs_read_lock
2118
2188
    def get_revision_reconcile(self, revision_id):
2137
2207
    def _copy_repository_tarball(self, to_bzrdir, revision_id=None):
2138
2208
        # get a tarball of the remote repository, and copy from that into the
2139
2209
        # destination
2140
 
        from bzrlib import osutils
2141
2210
        import tarfile
2142
2211
        # TODO: Maybe a progress bar while streaming the tarball?
2143
2212
        note(gettext("Copying repository content as tarball..."))
2197
2266
    def revisions(self):
2198
2267
        """Decorate the real repository for now.
2199
2268
 
2200
 
        In the short term this should become a real object to intercept graph
2201
 
        lookups.
2202
 
 
2203
2269
        In the long term a full blown network facility is needed.
2204
2270
        """
2205
2271
        self._ensure_real()
2336
2402
            return self._real_repository.add_signature_text(
2337
2403
                revision_id, signature)
2338
2404
        path = self.bzrdir._path_for_remote_call(self._client)
2339
 
        response, response_handler = self._call_with_body_bytes(
2340
 
            'Repository.add_signature_text', (path, revision_id),
2341
 
            signature)
 
2405
        response, handler = self._call_with_body_bytes_expecting_body(
 
2406
            'Repository.add_signature_text', (path, self._lock_token,
 
2407
                revision_id) + tuple(self._write_group_tokens), signature)
 
2408
        handler.cancel_read_body()
2342
2409
        self.refresh_data()
2343
2410
        if response[0] != 'ok':
2344
2411
            raise errors.UnexpectedSmartServerResponse(response)
 
2412
        self._write_group_tokens = response[1:]
2345
2413
 
2346
2414
    def has_signature_for_revision_id(self, revision_id):
2347
2415
        path = self.bzrdir._path_for_remote_call(self._client)
3855
3923
    lambda err, find, get_path: errors.ReadError(get_path()))
3856
3924
error_translators.register('NoSuchFile',
3857
3925
    lambda err, find, get_path: errors.NoSuchFile(get_path()))
 
3926
error_translators.register('UnsuspendableWriteGroup',
 
3927
    lambda err, find, get_path: errors.UnsuspendableWriteGroup(
 
3928
        repository=find('repository')))
 
3929
error_translators.register('UnresumableWriteGroup',
 
3930
    lambda err, find, get_path: errors.UnresumableWriteGroup(
 
3931
        repository=find('repository'), write_groups=err.error_args[0],
 
3932
        reason=err.error_args[1]))
3858
3933
no_context_error_translators.register('IncompatibleRepositories',
3859
3934
    lambda err: errors.IncompatibleRepositories(
3860
3935
        err.error_args[0], err.error_args[1], err.error_args[2]))
3905
3980
no_context_error_translators.register('MemoryError',
3906
3981
    lambda err: errors.BzrError("remote server out of memory\n"
3907
3982
        "Retry non-remotely, or contact the server admin for details."))
 
3983
no_context_error_translators.register('RevisionNotPresent',
 
3984
    lambda err: errors.RevisionNotPresent(err.error_args[0], err.error_args[1]))
3908
3985
 
3909
3986
no_context_error_translators.register('BzrCheckError',
3910
3987
    lambda err: errors.BzrCheckError(msg=err.error_args[0]))
3911
3988
 
3912
 
error_translators.register('UnsuspendableWriteGroup',
3913
 
    lambda err, find, get_path: errors.UnsuspendableWriteGroup(
3914
 
        repository=find('repository')))
3915
 
error_translators.register('UnresumableWriteGroup',
3916
 
    lambda err, find, get_path: errors.UnresumableWriteGroup(
3917
 
        repository=find('repository'), write_groups=err.error_args[0],
3918
 
        reason=err.error_args[1]))