516
525
if len(branch_info) != 2:
517
526
raise errors.UnexpectedSmartServerResponse(response)
518
527
branch_ref, branch_name = branch_info
519
format = controldir.network_format_registry.get(control_name)
529
format = controldir.network_format_registry.get(control_name)
531
raise errors.UnknownFormatError(kind='control', format=control_name)
521
format.repository_format = _mod_repository.network_format_registry.get(
535
format.repository_format = _mod_repository.network_format_registry.get(
538
raise errors.UnknownFormatError(kind='repository',
523
540
if branch_ref == 'ref':
524
541
# XXX: we need possible_transports here to avoid reopening the
525
542
# connection to the referenced location
668
690
# a branch reference, use the existing BranchReference logic.
669
691
format = BranchReferenceFormat()
670
692
return format.open(self, name=name, _found=True,
671
location=response[1], ignore_fallbacks=ignore_fallbacks)
693
location=response[1], ignore_fallbacks=ignore_fallbacks,
694
possible_transports=possible_transports)
672
695
branch_format_name = response[1]
673
696
if not branch_format_name:
674
697
branch_format_name = None
675
698
format = RemoteBranchFormat(network_name=branch_format_name)
676
699
return RemoteBranch(self, self.find_repository(), format=format,
677
setup_stacking=not ignore_fallbacks, name=name)
700
setup_stacking=not ignore_fallbacks, name=name,
701
possible_transports=possible_transports)
679
703
def _open_repo_v1(self, path):
680
704
verb = 'BzrDir.find_repository'
1869
1896
# the InterRepository base class, which raises an
1870
1897
# IncompatibleRepositories when asked to fetch.
1871
1898
inter = _mod_repository.InterRepository.get(source, self)
1899
if (fetch_spec is not None and
1900
not getattr(inter, "supports_fetch_spec", False)):
1901
raise errors.UnsupportedOperation(
1902
"fetch_spec not supported for %r" % inter)
1872
1903
return inter.fetch(revision_id=revision_id,
1873
1904
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1892
1923
return self._real_repository._get_versioned_file_checker(
1893
1924
revisions, revision_versions_cache)
1926
def _iter_files_bytes_rpc(self, desired_files, absent):
1927
path = self.bzrdir._path_for_remote_call(self._client)
1930
for (file_id, revid, identifier) in desired_files:
1931
lines.append("%s\0%s" % (
1932
osutils.safe_file_id(file_id),
1933
osutils.safe_revision_id(revid)))
1934
identifiers.append(identifier)
1935
(response_tuple, response_handler) = (
1936
self._call_with_body_bytes_expecting_body(
1937
"Repository.iter_files_bytes", (path, ), "\n".join(lines)))
1938
if response_tuple != ('ok', ):
1939
response_handler.cancel_read_body()
1940
raise errors.UnexpectedSmartServerResponse(response_tuple)
1941
byte_stream = response_handler.read_streamed_body()
1942
def decompress_stream(start, byte_stream, unused):
1943
decompressor = zlib.decompressobj()
1944
yield decompressor.decompress(start)
1945
while decompressor.unused_data == "":
1947
data = byte_stream.next()
1948
except StopIteration:
1950
yield decompressor.decompress(data)
1951
yield decompressor.flush()
1952
unused.append(decompressor.unused_data)
1955
while not "\n" in unused:
1956
unused += byte_stream.next()
1957
header, rest = unused.split("\n", 1)
1958
args = header.split("\0")
1959
if args[0] == "absent":
1960
absent[identifiers[int(args[3])]] = (args[1], args[2])
1963
elif args[0] == "ok":
1966
raise errors.UnexpectedSmartServerResponse(args)
1968
yield (identifiers[idx],
1969
decompress_stream(rest, byte_stream, unused_chunks))
1970
unused = "".join(unused_chunks)
1895
1972
def iter_files_bytes(self, desired_files):
1896
1973
"""See Repository.iter_file_bytes.
1899
return self._real_repository.iter_files_bytes(desired_files)
1977
for (identifier, bytes_iterator) in self._iter_files_bytes_rpc(
1978
desired_files, absent):
1979
yield identifier, bytes_iterator
1980
for fallback in self._fallback_repositories:
1983
desired_files = [(key[0], key[1], identifier) for
1984
(identifier, key) in absent.iteritems()]
1985
for (identifier, bytes_iterator) in fallback.iter_files_bytes(desired_files):
1986
del absent[identifier]
1987
yield identifier, bytes_iterator
1989
# There may be more missing items, but raise an exception
1991
missing_identifier = absent.keys()[0]
1992
missing_key = absent[missing_identifier]
1993
raise errors.RevisionNotPresent(revision_id=missing_key[1],
1994
file_id=missing_key[0])
1995
except errors.UnknownSmartMethod:
1997
for (identifier, bytes_iterator) in (
1998
self._real_repository.iter_files_bytes(desired_files)):
1999
yield identifier, bytes_iterator
1901
2001
def get_cached_parent_map(self, revision_ids):
1902
2002
"""See bzrlib.CachingParentsProvider.get_cached_parent_map"""
1967
2067
parents_map = {}
1968
2068
if _DEFAULT_SEARCH_DEPTH <= 0:
1969
2069
(start_set, stop_keys,
1970
key_count) = graph.search_result_from_parent_map(
2070
key_count) = vf_search.search_result_from_parent_map(
1971
2071
parents_map, self._unstacked_provider.missing_keys)
1973
2073
(start_set, stop_keys,
1974
key_count) = graph.limited_search_result_from_parent_map(
2074
key_count) = vf_search.limited_search_result_from_parent_map(
1975
2075
parents_map, self._unstacked_provider.missing_keys,
1976
2076
keys, depth=_DEFAULT_SEARCH_DEPTH)
1977
2077
recipe = ('manual', start_set, stop_keys, key_count)
2029
2129
@needs_read_lock
2030
2130
def get_signature_text(self, revision_id):
2032
return self._real_repository.get_signature_text(revision_id)
2131
path = self.bzrdir._path_for_remote_call(self._client)
2133
response_tuple, response_handler = self._call_expecting_body(
2134
'Repository.get_revision_signature_text', path, revision_id)
2135
except errors.UnknownSmartMethod:
2137
return self._real_repository.get_signature_text(revision_id)
2138
except errors.NoSuchRevision, err:
2139
for fallback in self._fallback_repositories:
2141
return fallback.get_signature_text(revision_id)
2142
except errors.NoSuchRevision:
2146
if response_tuple[0] != 'ok':
2147
raise errors.UnexpectedSmartServerResponse(response_tuple)
2148
return response_handler.read_body_bytes()
2034
2150
@needs_read_lock
2035
2151
def _get_inventory_xml(self, revision_id):
2133
2249
@needs_write_lock
2134
2250
def pack(self, hint=None, clean_obsolete_packs=False):
2135
2251
"""Compress the data within the repository.
2137
This is not currently implemented within the smart server.
2140
return self._real_repository.pack(hint=hint, clean_obsolete_packs=clean_obsolete_packs)
2256
body = "".join([l+"\n" for l in hint])
2257
path = self.bzrdir._path_for_remote_call(self._client)
2259
response, handler = self._call_with_body_bytes_expecting_body(
2260
'Repository.pack', (path, self._lock_token,
2261
str(clean_obsolete_packs)), body)
2262
except errors.UnknownSmartMethod:
2264
return self._real_repository.pack(hint=hint,
2265
clean_obsolete_packs=clean_obsolete_packs)
2266
handler.cancel_read_body()
2267
if response != ('ok', ):
2268
raise errors.UnexpectedSmartServerResponse(response)
2143
2271
def revisions(self):
2144
2272
"""Decorate the real repository for now.
2146
In the short term this should become a real object to intercept graph
2149
2274
In the long term a full blown network facility is needed.
2151
2276
self._ensure_real()
2193
2318
self._ensure_real()
2194
2319
return self._real_repository.texts
2321
def _iter_revisions_rpc(self, revision_ids):
2322
body = "\n".join(revision_ids)
2323
path = self.bzrdir._path_for_remote_call(self._client)
2324
response_tuple, response_handler = (
2325
self._call_with_body_bytes_expecting_body(
2326
"Repository.iter_revisions", (path, ), body))
2327
if response_tuple[0] != "ok":
2328
raise errors.UnexpectedSmartServerResponse(response_tuple)
2329
serializer_format = response_tuple[1]
2330
serializer = serializer_format_registry.get(serializer_format)
2331
byte_stream = response_handler.read_streamed_body()
2332
decompressor = zlib.decompressobj()
2334
for bytes in byte_stream:
2335
chunks.append(decompressor.decompress(bytes))
2336
if decompressor.unused_data != "":
2337
chunks.append(decompressor.flush())
2338
yield serializer.read_revision_from_string("".join(chunks))
2339
unused = decompressor.unused_data
2340
decompressor = zlib.decompressobj()
2341
chunks = [decompressor.decompress(unused)]
2342
chunks.append(decompressor.flush())
2343
text = "".join(chunks)
2345
yield serializer.read_revision_from_string("".join(chunks))
2196
2347
@needs_read_lock
2197
2348
def get_revisions(self, revision_ids):
2199
return self._real_repository.get_revisions(revision_ids)
2349
if revision_ids is None:
2350
revision_ids = self.all_revision_ids()
2352
for rev_id in revision_ids:
2353
if not rev_id or not isinstance(rev_id, basestring):
2354
raise errors.InvalidRevisionId(
2355
revision_id=rev_id, branch=self)
2357
missing = set(revision_ids)
2359
for rev in self._iter_revisions_rpc(revision_ids):
2360
missing.remove(rev.revision_id)
2361
revs[rev.revision_id] = rev
2362
except errors.UnknownSmartMethod:
2364
return self._real_repository.get_revisions(revision_ids)
2365
for fallback in self._fallback_repositories:
2368
for revid in list(missing):
2369
# XXX JRV 2011-11-20: It would be nice if there was a
2370
# public method on Repository that could be used to query
2371
# for revision objects *without* failing completely if one
2372
# was missing. There is VersionedFileRepository._iter_revisions,
2373
# but unfortunately that's private and not provided by
2374
# all repository implementations.
2376
revs[revid] = fallback.get_revision(revid)
2377
except errors.NoSuchRevision:
2380
missing.remove(revid)
2382
raise errors.NoSuchRevision(self, list(missing)[0])
2383
return [revs[revid] for revid in revision_ids]
2201
2385
def supports_rich_root(self):
2202
2386
return self._format.rich_root_data
2223
2407
return self._real_repository.add_signature_text(
2224
2408
revision_id, signature)
2225
2409
path = self.bzrdir._path_for_remote_call(self._client)
2226
response, response_handler = self._call_with_body_bytes(
2227
'Repository.add_signature_text', (path, revision_id),
2410
response, handler = self._call_with_body_bytes_expecting_body(
2411
'Repository.add_signature_text', (path, self._lock_token,
2412
revision_id) + tuple(self._write_group_tokens), signature)
2413
handler.cancel_read_body()
2229
2414
self.refresh_data()
2230
2415
if response[0] != 'ok':
2231
2416
raise errors.UnexpectedSmartServerResponse(response)
2417
self._write_group_tokens = response[1:]
2233
2419
def has_signature_for_revision_id(self, revision_id):
2234
2420
path = self.bzrdir._path_for_remote_call(self._client)
2449
2640
def _real_stream(self, repo, search):
2450
2641
"""Get a stream for search from repo.
2452
This never called RemoteStreamSource.get_stream, and is a heler
2453
for RemoteStreamSource._get_stream to allow getting a stream
2643
This never called RemoteStreamSource.get_stream, and is a helper
2644
for RemoteStreamSource._get_stream to allow getting a stream
2454
2645
reliably whether fallback back because of old servers or trying
2455
2646
to stream from a non-RemoteRepository (which the stacked support
2799
2994
def __init__(self, remote_bzrdir, remote_repository, real_branch=None,
2800
_client=None, format=None, setup_stacking=True, name=None):
2995
_client=None, format=None, setup_stacking=True, name=None,
2996
possible_transports=None):
2801
2997
"""Create a RemoteBranch instance.
2803
2999
:param real_branch: An optional local implementation of the branch
3727
3928
lambda err, find, get_path: errors.ReadError(get_path()))
3728
3929
error_translators.register('NoSuchFile',
3729
3930
lambda err, find, get_path: errors.NoSuchFile(get_path()))
3931
error_translators.register('UnsuspendableWriteGroup',
3932
lambda err, find, get_path: errors.UnsuspendableWriteGroup(
3933
repository=find('repository')))
3934
error_translators.register('UnresumableWriteGroup',
3935
lambda err, find, get_path: errors.UnresumableWriteGroup(
3936
repository=find('repository'), write_groups=err.error_args[0],
3937
reason=err.error_args[1]))
3730
3938
no_context_error_translators.register('IncompatibleRepositories',
3731
3939
lambda err: errors.IncompatibleRepositories(
3732
3940
err.error_args[0], err.error_args[1], err.error_args[2]))
3777
3985
no_context_error_translators.register('MemoryError',
3778
3986
lambda err: errors.BzrError("remote server out of memory\n"
3779
3987
"Retry non-remotely, or contact the server admin for details."))
3988
no_context_error_translators.register('RevisionNotPresent',
3989
lambda err: errors.RevisionNotPresent(err.error_args[0], err.error_args[1]))
3781
3991
no_context_error_translators.register('BzrCheckError',
3782
3992
lambda err: errors.BzrCheckError(msg=err.error_args[0]))
3784
error_translators.register('UnsuspendableWriteGroup',
3785
lambda err, find, get_path: errors.UnsuspendableWriteGroup(
3786
repository=find('repository')))
3787
error_translators.register('UnresumableWriteGroup',
3788
lambda err, find, get_path: errors.UnresumableWriteGroup(
3789
repository=find('repository'), write_groups=err.error_args[0],
3790
reason=err.error_args[1]))