73
73
def response_tuple_to_repo_format(response):
74
74
"""Convert a response tuple describing a repository format to a format."""
75
75
format = RemoteRepositoryFormat()
76
format.rich_root_data = (response[0] == 'yes')
77
format.supports_tree_reference = (response[1] == 'yes')
78
format.supports_external_lookups = (response[2] == 'yes')
76
format._rich_root_data = (response[0] == 'yes')
77
format._supports_tree_reference = (response[1] == 'yes')
78
format._supports_external_lookups = (response[2] == 'yes')
79
79
format._network_name = response[3]
412
412
self._custom_format = None
413
413
self._network_name = None
414
414
self._creating_bzrdir = None
415
self._supports_external_lookups = None
416
self._supports_tree_reference = None
417
self._rich_root_data = None
420
def rich_root_data(self):
421
if self._rich_root_data is None:
423
self._rich_root_data = self._custom_format.rich_root_data
424
return self._rich_root_data
427
def supports_external_lookups(self):
428
if self._supports_external_lookups is None:
430
self._supports_external_lookups = \
431
self._custom_format.supports_external_lookups
432
return self._supports_external_lookups
435
def supports_tree_reference(self):
436
if self._supports_tree_reference is None:
438
self._supports_tree_reference = \
439
self._custom_format.supports_tree_reference
440
return self._supports_tree_reference
416
442
def _vfs_initialize(self, a_bzrdir, shared):
417
443
"""Helper for common code in initialize."""
627
653
"""Ensure that there is a _real_repository set.
629
655
Used before calls to self._real_repository.
657
Note that _ensure_real causes many roundtrips to the server which are
658
not desirable, and prevents the use of smart one-roundtrip RPC's to
659
perform complex operations (such as accessing parent data, streaming
660
revisions etc). Adding calls to _ensure_real should only be done when
661
bringing up new functionality, adding fallbacks for smart methods that
662
require a fallback path, and never to replace an existing smart method
663
invocation. If in doubt chat to the bzr network team.
631
665
if self._real_repository is None:
632
666
self.bzrdir._ensure_real()
1006
1040
:param repository: A repository.
1008
# XXX: At the moment the RemoteRepository will allow fallbacks
1009
# unconditionally - however, a _real_repository will usually exist,
1010
# and may raise an error if it's not accommodated by the underlying
1011
# format. Eventually we should check when opening the repository
1012
# whether it's willing to allow them or not.
1042
if not self._format.supports_external_lookups:
1043
raise errors.UnstackableRepositoryFormat(
1044
self._format.network_name(), self.base)
1014
1045
# We need to accumulate additional repositories here, to pass them in
1015
1046
# on various RPC's.
1069
1100
self._ensure_real()
1070
1101
return self._real_repository.make_working_trees()
1103
def refresh_data(self):
1104
"""Re-read any data needed to to synchronise with disk.
1106
This method is intended to be called after another repository instance
1107
(such as one used by a smart server) has inserted data into the
1108
repository. It may not be called during a write group, but may be
1109
called at any other time.
1111
if self.is_in_write_group():
1112
raise errors.InternalBzrError(
1113
"May not refresh_data while in a write group.")
1114
if self._real_repository is not None:
1115
self._real_repository.refresh_data()
1072
1117
def revision_ids_to_search_result(self, result_set):
1073
1118
"""Convert a set of revision ids to a graph SearchResult."""
1074
1119
result_parents = set()
1096
1141
def fetch(self, source, revision_id=None, pb=None, find_ghosts=False,
1097
1142
fetch_spec=None):
1143
# No base implementation to use as RemoteRepository is not a subclass
1144
# of Repository; so this is a copy of Repository.fetch().
1098
1145
if fetch_spec is not None and revision_id is not None:
1099
1146
raise AssertionError(
1100
1147
"fetch_spec and revision_id are mutually exclusive.")
1101
# Not delegated to _real_repository so that InterRepository.get has a
1102
# chance to find an InterRepository specialised for RemoteRepository.
1148
if self.is_in_write_group():
1149
raise errors.InternalBzrError(
1150
"May not fetch while in a write group.")
1151
# fast path same-url fetch operations
1103
1152
if self.has_same_location(source) and fetch_spec is None:
1104
1153
# check that last_revision is in 'from' and then return a
1105
1154
# no-operation.
1107
1156
not revision.is_null(revision_id)):
1108
1157
self.get_revision(revision_id)
1159
# if there is no specific appropriate InterRepository, this will get
1160
# the InterRepository base class, which raises an
1161
# IncompatibleRepositories when asked to fetch.
1110
1162
inter = repository.InterRepository.get(source, self)
1112
return inter.fetch(revision_id=revision_id, pb=pb,
1113
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1114
except NotImplementedError:
1115
raise errors.IncompatibleRepositories(source, self)
1163
return inter.fetch(revision_id=revision_id, pb=pb,
1164
find_ghosts=find_ghosts, fetch_spec=fetch_spec)
1117
1166
def create_bundle(self, target, base, fileobj, format=None):
1118
1167
self._ensure_real()
1201
1250
stop_keys = result_parents.difference(start_set)
1202
1251
included_keys = start_set.intersection(result_parents)
1203
1252
start_set.difference_update(included_keys)
1204
recipe = (start_set, stop_keys, len(parents_map))
1253
recipe = ('manual', start_set, stop_keys, len(parents_map))
1205
1254
body = self._serialise_search_recipe(recipe)
1206
1255
path = self.bzrdir._path_for_remote_call(self._client)
1207
1256
for key in keys:
1466
1515
:param recipe: A search recipe (start, stop, count).
1467
1516
:return: Serialised bytes.
1469
start_keys = ' '.join(recipe[0])
1470
stop_keys = ' '.join(recipe[1])
1471
count = str(recipe[2])
1518
start_keys = ' '.join(recipe[1])
1519
stop_keys = ' '.join(recipe[2])
1520
count = str(recipe[3])
1472
1521
return '\n'.join((start_keys, stop_keys, count))
1474
1523
def _serialise_search_result(self, search_result):
1488
1537
self._ensure_real()
1489
1538
self._real_repository._pack_collection.autopack()
1491
if self._real_repository is not None:
1492
# Reset the real repository's cache of pack names.
1493
# XXX: At some point we may be able to skip this and just rely on
1494
# the automatic retry logic to do the right thing, but for now we
1495
# err on the side of being correct rather than being optimal.
1496
self._real_repository._pack_collection.reload_pack_names()
1497
1541
if response[0] != 'ok':
1498
1542
raise errors.UnexpectedSmartServerResponse(response)
1511
1555
def insert_stream(self, stream, src_format, resume_tokens):
1512
repo = self.target_repo
1513
client = repo._client
1556
target = self.target_repo
1557
if target._lock_token:
1558
verb = 'Repository.insert_stream_locked'
1559
extra_args = (target._lock_token or '',)
1560
required_version = (1, 14)
1562
verb = 'Repository.insert_stream'
1564
required_version = (1, 13)
1565
client = target._client
1514
1566
medium = client._medium
1515
if medium._is_remote_before((1, 13)):
1567
if medium._is_remote_before(required_version):
1516
1568
# No possible way this can work.
1517
1569
return self._insert_real(stream, src_format, resume_tokens)
1518
path = repo.bzrdir._path_for_remote_call(client)
1570
path = target.bzrdir._path_for_remote_call(client)
1519
1571
if not resume_tokens:
1520
1572
# XXX: Ugly but important for correctness, *will* be fixed during
1521
1573
# 1.13 cycle. Pushing a stream that is interrupted results in a
1528
1580
byte_stream = smart_repo._stream_to_byte_stream([], src_format)
1530
1582
response = client.call_with_body_stream(
1531
('Repository.insert_stream', path, ''), byte_stream)
1583
(verb, path, '') + extra_args, byte_stream)
1532
1584
except errors.UnknownSmartMethod:
1533
medium._remember_remote_is_before((1,13))
1585
medium._remember_remote_is_before(required_version)
1534
1586
return self._insert_real(stream, src_format, resume_tokens)
1535
1587
byte_stream = smart_repo._stream_to_byte_stream(
1536
1588
stream, src_format)
1537
1589
resume_tokens = ' '.join(resume_tokens)
1538
1590
response = client.call_with_body_stream(
1539
('Repository.insert_stream', path, resume_tokens), byte_stream)
1591
(verb, path, resume_tokens) + extra_args, byte_stream)
1540
1592
if response[0][0] not in ('ok', 'missing-basis'):
1541
1593
raise errors.UnexpectedSmartServerResponse(response)
1542
1594
if response[0][0] == 'missing-basis':
1544
1596
resume_tokens = tokens
1545
1597
return resume_tokens, missing_keys
1547
if self.target_repo._real_repository is not None:
1548
collection = getattr(self.target_repo._real_repository,
1549
'_pack_collection', None)
1550
if collection is not None:
1551
collection.reload_pack_names()
1599
self.target_repo.refresh_data()
1552
1600
return [], set()
1556
1604
"""Stream data from a remote server."""
1558
1606
def get_stream(self, search):
1559
# streaming with fallback repositories is not well defined yet: The
1560
# remote repository cannot see the fallback repositories, and thus
1561
# cannot satisfy the entire search in the general case. Likewise the
1562
# fallback repositories cannot reify the search to determine what they
1563
# should send. It likely needs a return value in the stream listing the
1564
# edge of the search to resume from in fallback repositories.
1565
if self.from_repository._fallback_repositories:
1566
return repository.StreamSource.get_stream(self, search)
1567
repo = self.from_repository
1607
if (self.from_repository._fallback_repositories and
1608
self.to_format._fetch_order == 'topological'):
1609
return self._real_stream(self.from_repository, search)
1610
return self.missing_parents_chain(search, [self.from_repository] +
1611
self.from_repository._fallback_repositories)
1613
def _real_stream(self, repo, search):
1614
"""Get a stream for search from repo.
1616
This never called RemoteStreamSource.get_stream, and is a heler
1617
for RemoteStreamSource._get_stream to allow getting a stream
1618
reliably whether fallback back because of old servers or trying
1619
to stream from a non-RemoteRepository (which the stacked support
1622
source = repo._get_source(self.to_format)
1623
if isinstance(source, RemoteStreamSource):
1624
return repository.StreamSource.get_stream(source, search)
1625
return source.get_stream(search)
1627
def _get_stream(self, repo, search):
1628
"""Core worker to get a stream from repo for search.
1630
This is used by both get_stream and the stacking support logic. It
1631
deliberately gets a stream for repo which does not need to be
1632
self.from_repository. In the event that repo is not Remote, or
1633
cannot do a smart stream, a fallback is made to the generic
1634
repository._get_stream() interface, via self._real_stream.
1636
In the event of stacking, streams from _get_stream will not
1637
contain all the data for search - this is normal (see get_stream).
1639
:param repo: A repository.
1640
:param search: A search.
1642
# Fallbacks may be non-smart
1643
if not isinstance(repo, RemoteRepository):
1644
return self._real_stream(repo, search)
1568
1645
client = repo._client
1569
1646
medium = client._medium
1570
1647
if medium._is_remote_before((1, 13)):
1571
# No possible way this can work.
1572
return repository.StreamSource.get_stream(self, search)
1648
# streaming was added in 1.13
1649
return self._real_stream(repo, search)
1573
1650
path = repo.bzrdir._path_for_remote_call(client)
1575
1652
search_bytes = repo._serialise_search_result(search)
1579
1656
response_tuple, response_handler = response
1580
1657
except errors.UnknownSmartMethod:
1581
1658
medium._remember_remote_is_before((1,13))
1582
return repository.StreamSource.get_stream(self, search)
1659
return self._real_stream(repo, search)
1583
1660
if response_tuple[0] != 'ok':
1584
1661
raise errors.UnexpectedSmartServerResponse(response_tuple)
1585
1662
byte_stream = response_handler.read_streamed_body()
1590
1667
src_format.network_name(), repo._format.network_name()))
1670
def missing_parents_chain(self, search, sources):
1671
"""Chain multiple streams together to handle stacking.
1673
:param search: The overall search to satisfy with streams.
1674
:param sources: A list of Repository objects to query.
1676
self.serialiser = self.to_format._serializer
1677
self.seen_revs = set()
1678
self.referenced_revs = set()
1679
# If there are heads in the search, or the key count is > 0, we are not
1681
while not search.is_empty() and len(sources) > 1:
1682
source = sources.pop(0)
1683
stream = self._get_stream(source, search)
1684
for kind, substream in stream:
1685
if kind != 'revisions':
1686
yield kind, substream
1688
yield kind, self.missing_parents_rev_handler(substream)
1689
search = search.refine(self.seen_revs, self.referenced_revs)
1690
self.seen_revs = set()
1691
self.referenced_revs = set()
1692
if not search.is_empty():
1693
for kind, stream in self._get_stream(sources[0], search):
1696
def missing_parents_rev_handler(self, substream):
1697
for content in substream:
1698
revision_bytes = content.get_bytes_as('fulltext')
1699
revision = self.serialiser.read_revision_from_string(revision_bytes)
1700
self.seen_revs.add(content.key[-1])
1701
self.referenced_revs.update(revision.parent_ids)
1594
1705
class RemoteBranchLockableFiles(LockableFiles):
1595
1706
"""A 'LockableFiles' implementation that talks to a smart server.