1240
1240
stop_keys = result_parents.difference(start_set)
1241
1241
included_keys = start_set.intersection(result_parents)
1242
1242
start_set.difference_update(included_keys)
1243
recipe = (start_set, stop_keys, len(parents_map))
1243
recipe = ('manual', start_set, stop_keys, len(parents_map))
1244
1244
body = self._serialise_search_recipe(recipe)
1245
1245
path = self.bzrdir._path_for_remote_call(self._client)
1246
1246
for key in keys:
1505
1505
:param recipe: A search recipe (start, stop, count).
1506
1506
:return: Serialised bytes.
1508
start_keys = ' '.join(recipe[0])
1509
stop_keys = ' '.join(recipe[1])
1510
count = str(recipe[2])
1508
start_keys = ' '.join(recipe[1])
1509
stop_keys = ' '.join(recipe[2])
1510
count = str(recipe[3])
1511
1511
return '\n'.join((start_keys, stop_keys, count))
1513
1513
def _serialise_search_result(self, search_result):
1594
1594
"""Stream data from a remote server."""
1596
1596
def get_stream(self, search):
1597
# streaming with fallback repositories is not well defined yet: The
1598
# remote repository cannot see the fallback repositories, and thus
1599
# cannot satisfy the entire search in the general case. Likewise the
1600
# fallback repositories cannot reify the search to determine what they
1601
# should send. It likely needs a return value in the stream listing the
1602
# edge of the search to resume from in fallback repositories.
1603
if self.from_repository._fallback_repositories:
1604
return repository.StreamSource.get_stream(self, search)
1605
repo = self.from_repository
1597
if (self.from_repository._fallback_repositories and
1598
self.to_format._fetch_order == 'topological'):
1599
return self._real_stream(self.from_repository, search)
1600
return self.missing_parents_chain(search, [self.from_repository] +
1601
self.from_repository._fallback_repositories)
1603
def _real_stream(self, repo, search):
1604
"""Get a stream for search from repo.
1606
This never called RemoteStreamSource.get_stream, and is a heler
1607
for RemoteStreamSource._get_stream to allow getting a stream
1608
reliably whether fallback back because of old servers or trying
1609
to stream from a non-RemoteRepository (which the stacked support
1612
source = repo._get_source(self.to_format)
1613
if isinstance(source, RemoteStreamSource):
1614
return repository.StreamSource.get_stream(source, search)
1615
return source.get_stream(search)
1617
def _get_stream(self, repo, search):
1618
"""Core worker to get a stream from repo for search.
1620
This is used by both get_stream and the stacking support logic. It
1621
deliberately gets a stream for repo which does not need to be
1622
self.from_repository. In the event that repo is not Remote, or
1623
cannot do a smart stream, a fallback is made to the generic
1624
repository._get_stream() interface, via self._real_stream.
1626
In the event of stacking, streams from _get_stream will not
1627
contain all the data for search - this is normal (see get_stream).
1629
:param repo: A repository.
1630
:param search: A search.
1632
# Fallbacks may be non-smart
1633
if not isinstance(repo, RemoteRepository):
1634
return self._real_stream(repo, search)
1606
1635
client = repo._client
1607
1636
medium = client._medium
1608
1637
if medium._is_remote_before((1, 13)):
1609
# No possible way this can work.
1610
return repository.StreamSource.get_stream(self, search)
1638
# streaming was added in 1.13
1639
return self._real_stream(repo, search)
1611
1640
path = repo.bzrdir._path_for_remote_call(client)
1613
1642
search_bytes = repo._serialise_search_result(search)
1617
1646
response_tuple, response_handler = response
1618
1647
except errors.UnknownSmartMethod:
1619
1648
medium._remember_remote_is_before((1,13))
1620
return repository.StreamSource.get_stream(self, search)
1649
return self._real_stream(repo, search)
1621
1650
if response_tuple[0] != 'ok':
1622
1651
raise errors.UnexpectedSmartServerResponse(response_tuple)
1623
1652
byte_stream = response_handler.read_streamed_body()
1628
1657
src_format.network_name(), repo._format.network_name()))
1660
def missing_parents_chain(self, search, sources):
1661
"""Chain multiple streams together to handle stacking.
1663
:param search: The overall search to satisfy with streams.
1664
:param sources: A list of Repository objects to query.
1666
self.serialiser = self.to_format._serializer
1667
self.seen_revs = set()
1668
self.referenced_revs = set()
1669
# If there are heads in the search, or the key count is > 0, we are not
1671
while not search.is_empty() and len(sources) > 1:
1672
source = sources.pop(0)
1673
stream = self._get_stream(source, search)
1674
for kind, substream in stream:
1675
if kind != 'revisions':
1676
yield kind, substream
1678
yield kind, self.missing_parents_rev_handler(substream)
1679
search = search.refine(self.seen_revs, self.referenced_revs)
1680
self.seen_revs = set()
1681
self.referenced_revs = set()
1682
if not search.is_empty():
1683
for kind, stream in self._get_stream(sources[0], search):
1686
def missing_parents_rev_handler(self, substream):
1687
for content in substream:
1688
revision_bytes = content.get_bytes_as('fulltext')
1689
revision = self.serialiser.read_revision_from_string(revision_bytes)
1690
self.seen_revs.add(content.key[-1])
1691
self.referenced_revs.update(revision.parent_ids)
1632
1695
class RemoteBranchLockableFiles(LockableFiles):
1633
1696
"""A 'LockableFiles' implementation that talks to a smart server.