13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
17
"""Server-side repository related request implmentations."""
28
27
from bzrlib import (
39
40
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
40
41
from bzrlib import revision as _mod_revision
41
from bzrlib.util import bencode
42
42
from bzrlib.versionedfile import NetworkRecordStream, record_to_fulltext_bytes
73
def recreate_search(self, repository, recipe_bytes):
74
lines = recipe_bytes.split('\n')
73
def recreate_search(self, repository, search_bytes, discard_excess=False):
74
"""Recreate a search from its serialised form.
76
:param discard_excess: If True, and the search refers to data we don't
77
have, just silently accept that fact - the verb calling
78
recreate_search trusts that clients will look for missing things
79
they expected and get it from elsewhere.
81
lines = search_bytes.split('\n')
82
if lines[0] == 'ancestry-of':
84
search_result = graph.PendingAncestryResult(heads, repository)
85
return search_result, None
86
elif lines[0] == 'search':
87
return self.recreate_search_from_recipe(repository, lines[1:],
88
discard_excess=discard_excess)
90
return (None, FailedSmartServerResponse(('BadSearch',)))
92
def recreate_search_from_recipe(self, repository, lines,
93
discard_excess=False):
94
"""Recreate a specific revision search (vs a from-tip search).
96
:param discard_excess: If True, and the search refers to data we don't
97
have, just silently accept that fact - the verb calling
98
recreate_search trusts that clients will look for missing things
99
they expected and get it from elsewhere.
75
101
start_keys = set(lines[0].split(' '))
76
102
exclude_keys = set(lines[1].split(' '))
77
103
revision_count = int(lines[2])
87
113
search.stop_searching_any(exclude_keys.intersection(next_revs))
88
114
search_result = search.get_result()
89
if search_result.get_recipe()[2] != revision_count:
115
if (not discard_excess and
116
search_result.get_recipe()[3] != revision_count):
90
117
# we got back a different amount of data than expected, this
91
118
# gets reported as NoSuchRevision, because less revisions
92
119
# indicates missing revisions, and more should never happen as
93
120
# the excludes list considers ghosts and ensures that ghost
94
121
# filling races are not a problem.
95
122
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
123
return (search_result, None)
98
125
repository.unlock()
123
150
from revision_ids is returned. The verb takes a body containing the
124
151
current search state, see do_body for details.
153
If 'include-missing:' is in revision_ids, ghosts encountered in the
154
graph traversal for getting parent data are included in the result with
155
a prefix of 'missing:'.
126
157
:param repository: The repository to query in.
127
158
:param revision_ids: The utf8 encoded revision_id to answer for.
147
178
def _do_repository_request(self, body_bytes):
148
179
repository = self._repository
149
180
revision_ids = set(self._revision_ids)
150
search, error = self.recreate_search(repository, body_bytes)
181
include_missing = 'include-missing:' in revision_ids
183
revision_ids.remove('include-missing:')
184
body_lines = body_bytes.split('\n')
185
search_result, error = self.recreate_search_from_recipe(
186
repository, body_lines)
151
187
if error is not None:
153
189
# TODO might be nice to start up the search again; but thats not
154
190
# written or tested yet.
155
client_seen_revs = set(search.get_result().get_keys())
191
client_seen_revs = set(search_result.get_keys())
156
192
# Always include the requested ids.
157
193
client_seen_revs.difference_update(revision_ids)
166
202
queried_revs.update(next_revs)
167
203
parent_map = repo_graph.get_parent_map(next_revs)
204
current_revs = next_revs
168
205
next_revs = set()
169
for revision_id, parents in parent_map.iteritems():
170
# adjust for the wire
171
if parents == (_mod_revision.NULL_REVISION,):
173
# prepare the next query
174
next_revs.update(parents)
175
if revision_id not in client_seen_revs:
206
for revision_id in current_revs:
208
parents = parent_map.get(revision_id)
209
if parents is not None:
210
# adjust for the wire
211
if parents == (_mod_revision.NULL_REVISION,):
213
# prepare the next query
214
next_revs.update(parents)
215
encoded_id = revision_id
218
encoded_id = "missing:" + revision_id
220
if (revision_id not in client_seen_revs and
221
(not missing_rev or include_missing)):
176
222
# Client does not have this revision, give it to it.
177
223
# add parents to the result
178
result[revision_id] = parents
224
result[encoded_id] = parents
179
225
# Approximate the serialized cost of this revision_id.
180
size_so_far += 2 + len(revision_id) + sum(map(len, parents))
226
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
181
227
# get all the directly asked for parents, and then flesh out to
182
228
# 64K (compressed) or so. We do one level of depth at a time to
183
229
# stay in sync with the client. The 250000 magic number is
237
283
return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
286
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
288
def do_readlocked_repository_request(self, repository, revno,
290
"""Find the revid for a given revno, given a known revno/revid pair.
295
found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
296
except errors.RevisionNotPresent, err:
297
if err.revision_id != known_pair[1]:
298
raise AssertionError(
299
'get_rev_id_for_revno raised RevisionNotPresent for '
300
'non-initial revision: ' + err.revision_id)
301
return FailedSmartServerResponse(
302
('nosuchrevision', err.revision_id))
304
return SuccessfulSmartServerResponse(('ok', result))
306
earliest_revno, earliest_revid = result
307
return SuccessfulSmartServerResponse(
308
('history-incomplete', earliest_revno, earliest_revid))
240
311
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
242
313
def do_repository_request(self, repository, revision_id):
349
420
repository = self._repository
350
421
repository.lock_read()
352
search, error = self.recreate_search(repository, body_bytes)
423
search_result, error = self.recreate_search(repository, body_bytes,
353
425
if error is not None:
354
426
repository.unlock()
356
search = search.get_result()
357
428
source = repository._get_source(self._to_format)
358
stream = source.get_stream(search)
429
stream = source.get_stream(search_result)
359
430
except Exception:
360
431
exc_info = sys.exc_info()
389
460
for record in substream:
390
461
if record.storage_kind in ('chunked', 'fulltext'):
391
462
serialised = record_to_fulltext_bytes(record)
463
elif record.storage_kind == 'absent':
464
raise ValueError("Absent factory for %s" % (record.key,))
393
466
serialised = record.get_bytes_as(record.storage_kind)
515
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
588
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
516
589
"""Insert a record stream from a RemoteSink into a repository.
518
591
This gets bytes pushed to it by the network infrastructure and turns that
519
592
into a bytes iterator using a thread. That is then processed by
520
593
_byte_stream_to_stream.
523
def do_repository_request(self, repository, resume_tokens):
598
def do_repository_request(self, repository, resume_tokens, lock_token):
524
599
"""StreamSink.insert_stream for a remote repository."""
525
repository.lock_write()
600
repository.lock_write(token=lock_token)
601
self.do_insert_stream_request(repository, resume_tokens)
603
def do_insert_stream_request(self, repository, resume_tokens):
526
604
tokens = [token for token in resume_tokens.split(' ') if token]
527
605
self.tokens = tokens
528
606
self.repository = repository
571
649
self.repository.unlock()
572
650
return SuccessfulSmartServerResponse(('ok', ))
653
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
654
"""Insert a record stream from a RemoteSink into an unlocked repository.
656
This is the same as SmartServerRepositoryInsertStreamLocked, except it
657
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
658
like pack format) repository.
663
def do_repository_request(self, repository, resume_tokens):
664
"""StreamSink.insert_stream for a remote repository."""
665
repository.lock_write()
666
self.do_insert_stream_request(repository, resume_tokens)