13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
17
"""Server-side repository related request implmentations."""
74
def recreate_search(self, repository, search_bytes):
75
lines = search_bytes.split('\n')
76
if lines[0] == 'ancestry-of':
78
search_result = graph.PendingAncestryResult(heads, repository)
79
return search_result, None
80
elif lines[0] == 'search':
81
return self.recreate_search_from_recipe(repository, lines[1:])
83
return (None, FailedSmartServerResponse(('BadSearch',)))
85
def recreate_search_from_recipe(self, repository, lines):
73
def recreate_search(self, repository, recipe_bytes):
74
lines = recipe_bytes.split('\n')
86
75
start_keys = set(lines[0].split(' '))
87
76
exclude_keys = set(lines[1].split(' '))
88
77
revision_count = int(lines[2])
98
87
search.stop_searching_any(exclude_keys.intersection(next_revs))
99
88
search_result = search.get_result()
100
if search_result.get_recipe()[3] != revision_count:
89
if search_result.get_recipe()[2] != revision_count:
101
90
# we got back a different amount of data than expected, this
102
91
# gets reported as NoSuchRevision, because less revisions
103
92
# indicates missing revisions, and more should never happen as
104
93
# the excludes list considers ghosts and ensures that ghost
105
94
# filling races are not a problem.
106
95
return (None, FailedSmartServerResponse(('NoSuchRevision',)))
107
return (search_result, None)
109
98
repository.unlock()
134
123
from revision_ids is returned. The verb takes a body containing the
135
124
current search state, see do_body for details.
137
If 'include-missing:' is in revision_ids, ghosts encountered in the
138
graph traversal for getting parent data are included in the result with
139
a prefix of 'missing:'.
141
126
:param repository: The repository to query in.
142
127
:param revision_ids: The utf8 encoded revision_id to answer for.
162
147
def _do_repository_request(self, body_bytes):
163
148
repository = self._repository
164
149
revision_ids = set(self._revision_ids)
165
include_missing = 'include-missing:' in revision_ids
167
revision_ids.remove('include-missing:')
168
body_lines = body_bytes.split('\n')
169
search_result, error = self.recreate_search_from_recipe(
170
repository, body_lines)
150
search, error = self.recreate_search(repository, body_bytes)
171
151
if error is not None:
173
153
# TODO might be nice to start up the search again; but thats not
174
154
# written or tested yet.
175
client_seen_revs = set(search_result.get_keys())
155
client_seen_revs = set(search.get_result().get_keys())
176
156
# Always include the requested ids.
177
157
client_seen_revs.difference_update(revision_ids)
186
166
queried_revs.update(next_revs)
187
167
parent_map = repo_graph.get_parent_map(next_revs)
188
current_revs = next_revs
189
168
next_revs = set()
190
for revision_id in current_revs:
192
parents = parent_map.get(revision_id)
193
if parents is not None:
194
# adjust for the wire
195
if parents == (_mod_revision.NULL_REVISION,):
197
# prepare the next query
198
next_revs.update(parents)
199
encoded_id = revision_id
202
encoded_id = "missing:" + revision_id
204
if (revision_id not in client_seen_revs and
205
(not missing_rev or include_missing)):
169
for revision_id, parents in parent_map.iteritems():
170
# adjust for the wire
171
if parents == (_mod_revision.NULL_REVISION,):
173
# prepare the next query
174
next_revs.update(parents)
175
if revision_id not in client_seen_revs:
206
176
# Client does not have this revision, give it to it.
207
177
# add parents to the result
208
result[encoded_id] = parents
178
result[revision_id] = parents
209
179
# Approximate the serialized cost of this revision_id.
210
size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
180
size_so_far += 2 + len(revision_id) + sum(map(len, parents))
211
181
# get all the directly asked for parents, and then flesh out to
212
182
# 64K (compressed) or so. We do one level of depth at a time to
213
183
# stay in sync with the client. The 250000 magic number is
379
349
repository = self._repository
380
350
repository.lock_read()
382
search_result, error = self.recreate_search(repository, body_bytes)
352
search, error = self.recreate_search(repository, body_bytes)
383
353
if error is not None:
384
354
repository.unlock()
356
search = search.get_result()
386
357
source = repository._get_source(self._to_format)
387
stream = source.get_stream(search_result)
358
stream = source.get_stream(search)
388
359
except Exception:
389
360
exc_info = sys.exc_info()
544
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
515
class SmartServerRepositoryInsertStream(SmartServerRepositoryRequest):
545
516
"""Insert a record stream from a RemoteSink into a repository.
547
518
This gets bytes pushed to it by the network infrastructure and turns that
548
519
into a bytes iterator using a thread. That is then processed by
549
520
_byte_stream_to_stream.
554
def do_repository_request(self, repository, resume_tokens, lock_token):
523
def do_repository_request(self, repository, resume_tokens):
555
524
"""StreamSink.insert_stream for a remote repository."""
556
repository.lock_write(token=lock_token)
557
self.do_insert_stream_request(repository, resume_tokens)
559
def do_insert_stream_request(self, repository, resume_tokens):
525
repository.lock_write()
560
526
tokens = [token for token in resume_tokens.split(' ') if token]
561
527
self.tokens = tokens
562
528
self.repository = repository
605
571
self.repository.unlock()
606
572
return SuccessfulSmartServerResponse(('ok', ))
609
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
610
"""Insert a record stream from a RemoteSink into an unlocked repository.
612
This is the same as SmartServerRepositoryInsertStreamLocked, except it
613
takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
614
like pack format) repository.
619
def do_repository_request(self, repository, resume_tokens):
620
"""StreamSink.insert_stream for a remote repository."""
621
repository.lock_write()
622
self.do_insert_stream_request(repository, resume_tokens)