~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Martin Packman
  • Date: 2011-12-08 19:00:14 UTC
  • mto: This revision was merged to the branch mainline in revision 6359.
  • Revision ID: martin.packman@canonical.com-20111208190014-mi8jm6v7jygmhb0r
Use --include-duplicates for make update-pot which already combines multiple msgid strings prettily

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Server-side repository related request implmentations."""
 
18
 
 
19
import bz2
 
20
import os
 
21
import Queue
 
22
import sys
 
23
import tempfile
 
24
import threading
 
25
import zlib
 
26
 
 
27
from bzrlib import (
 
28
    bencode,
 
29
    errors,
 
30
    estimate_compressed_size,
 
31
    osutils,
 
32
    pack,
 
33
    trace,
 
34
    ui,
 
35
    vf_search,
 
36
    )
 
37
from bzrlib.bzrdir import BzrDir
 
38
from bzrlib.smart.request import (
 
39
    FailedSmartServerResponse,
 
40
    SmartServerRequest,
 
41
    SuccessfulSmartServerResponse,
 
42
    )
 
43
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
44
from bzrlib import revision as _mod_revision
 
45
from bzrlib.versionedfile import (
 
46
    NetworkRecordStream,
 
47
    record_to_fulltext_bytes,
 
48
    )
 
49
 
 
50
 
 
51
class SmartServerRepositoryRequest(SmartServerRequest):
 
52
    """Common base class for Repository requests."""
 
53
 
 
54
    def do(self, path, *args):
 
55
        """Execute a repository request.
 
56
 
 
57
        All Repository requests take a path to the repository as their first
 
58
        argument.  The repository must be at the exact path given by the
 
59
        client - no searching is done.
 
60
 
 
61
        The actual logic is delegated to self.do_repository_request.
 
62
 
 
63
        :param client_path: The path for the repository as received from the
 
64
            client.
 
65
        :return: A SmartServerResponse from self.do_repository_request().
 
66
        """
 
67
        transport = self.transport_from_client_path(path)
 
68
        bzrdir = BzrDir.open_from_transport(transport)
 
69
        # Save the repository for use with do_body.
 
70
        self._repository = bzrdir.open_repository()
 
71
        return self.do_repository_request(self._repository, *args)
 
72
 
 
73
    def do_repository_request(self, repository, *args):
 
74
        """Override to provide an implementation for a verb."""
 
75
        # No-op for verbs that take bodies (None as a result indicates a body
 
76
        # is expected)
 
77
        return None
 
78
 
 
79
    def recreate_search(self, repository, search_bytes, discard_excess=False):
 
80
        """Recreate a search from its serialised form.
 
81
 
 
82
        :param discard_excess: If True, and the search refers to data we don't
 
83
            have, just silently accept that fact - the verb calling
 
84
            recreate_search trusts that clients will look for missing things
 
85
            they expected and get it from elsewhere.
 
86
        """
 
87
        if search_bytes == 'everything':
 
88
            return vf_search.EverythingResult(repository), None
 
89
        lines = search_bytes.split('\n')
 
90
        if lines[0] == 'ancestry-of':
 
91
            heads = lines[1:]
 
92
            search_result = vf_search.PendingAncestryResult(heads, repository)
 
93
            return search_result, None
 
94
        elif lines[0] == 'search':
 
95
            return self.recreate_search_from_recipe(repository, lines[1:],
 
96
                discard_excess=discard_excess)
 
97
        else:
 
98
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
99
 
 
100
    def recreate_search_from_recipe(self, repository, lines,
 
101
        discard_excess=False):
 
102
        """Recreate a specific revision search (vs a from-tip search).
 
103
 
 
104
        :param discard_excess: If True, and the search refers to data we don't
 
105
            have, just silently accept that fact - the verb calling
 
106
            recreate_search trusts that clients will look for missing things
 
107
            they expected and get it from elsewhere.
 
108
        """
 
109
        start_keys = set(lines[0].split(' '))
 
110
        exclude_keys = set(lines[1].split(' '))
 
111
        revision_count = int(lines[2])
 
112
        repository.lock_read()
 
113
        try:
 
114
            search = repository.get_graph()._make_breadth_first_searcher(
 
115
                start_keys)
 
116
            while True:
 
117
                try:
 
118
                    next_revs = search.next()
 
119
                except StopIteration:
 
120
                    break
 
121
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
122
            (started_keys, excludes, included_keys) = search.get_state()
 
123
            if (not discard_excess and len(included_keys) != revision_count):
 
124
                # we got back a different amount of data than expected, this
 
125
                # gets reported as NoSuchRevision, because less revisions
 
126
                # indicates missing revisions, and more should never happen as
 
127
                # the excludes list considers ghosts and ensures that ghost
 
128
                # filling races are not a problem.
 
129
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
130
            search_result = vf_search.SearchResult(started_keys, excludes,
 
131
                len(included_keys), included_keys)
 
132
            return (search_result, None)
 
133
        finally:
 
134
            repository.unlock()
 
135
 
 
136
 
 
137
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
138
    """Calls self.do_readlocked_repository_request."""
 
139
 
 
140
    def do_repository_request(self, repository, *args):
 
141
        """Read lock a repository for do_readlocked_repository_request."""
 
142
        repository.lock_read()
 
143
        try:
 
144
            return self.do_readlocked_repository_request(repository, *args)
 
145
        finally:
 
146
            repository.unlock()
 
147
 
 
148
 
 
149
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
150
    """Break a repository lock."""
 
151
 
 
152
    def do_repository_request(self, repository):
 
153
        repository.break_lock()
 
154
        return SuccessfulSmartServerResponse(('ok', ))
 
155
 
 
156
 
 
157
_lsprof_count = 0
 
158
 
 
159
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
160
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
161
 
 
162
    no_extra_results = False
 
163
 
 
164
    def do_repository_request(self, repository, *revision_ids):
 
165
        """Get parent details for some revisions.
 
166
 
 
167
        All the parents for revision_ids are returned. Additionally up to 64KB
 
168
        of additional parent data found by performing a breadth first search
 
169
        from revision_ids is returned. The verb takes a body containing the
 
170
        current search state, see do_body for details.
 
171
 
 
172
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
173
        graph traversal for getting parent data are included in the result with
 
174
        a prefix of 'missing:'.
 
175
 
 
176
        :param repository: The repository to query in.
 
177
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
178
        """
 
179
        self._revision_ids = revision_ids
 
180
        return None # Signal that we want a body.
 
181
 
 
182
    def do_body(self, body_bytes):
 
183
        """Process the current search state and perform the parent lookup.
 
184
 
 
185
        :return: A smart server response where the body contains an utf8
 
186
            encoded flattened list of the parents of the revisions (the same
 
187
            format as Repository.get_revision_graph) which has been bz2
 
188
            compressed.
 
189
        """
 
190
        repository = self._repository
 
191
        repository.lock_read()
 
192
        try:
 
193
            return self._do_repository_request(body_bytes)
 
194
        finally:
 
195
            repository.unlock()
 
196
 
 
197
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
198
                               include_missing, max_size=65536):
 
199
        result = {}
 
200
        queried_revs = set()
 
201
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
 
202
        next_revs = revision_ids
 
203
        first_loop_done = False
 
204
        while next_revs:
 
205
            queried_revs.update(next_revs)
 
206
            parent_map = repo_graph.get_parent_map(next_revs)
 
207
            current_revs = next_revs
 
208
            next_revs = set()
 
209
            for revision_id in current_revs:
 
210
                missing_rev = False
 
211
                parents = parent_map.get(revision_id)
 
212
                if parents is not None:
 
213
                    # adjust for the wire
 
214
                    if parents == (_mod_revision.NULL_REVISION,):
 
215
                        parents = ()
 
216
                    # prepare the next query
 
217
                    next_revs.update(parents)
 
218
                    encoded_id = revision_id
 
219
                else:
 
220
                    missing_rev = True
 
221
                    encoded_id = "missing:" + revision_id
 
222
                    parents = []
 
223
                if (revision_id not in client_seen_revs and
 
224
                    (not missing_rev or include_missing)):
 
225
                    # Client does not have this revision, give it to it.
 
226
                    # add parents to the result
 
227
                    result[encoded_id] = parents
 
228
                    # Approximate the serialized cost of this revision_id.
 
229
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
230
                    estimator.add_content(line)
 
231
            # get all the directly asked for parents, and then flesh out to
 
232
            # 64K (compressed) or so. We do one level of depth at a time to
 
233
            # stay in sync with the client. The 250000 magic number is
 
234
            # estimated compression ratio taken from bzr.dev itself.
 
235
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
236
                trace.mutter('size: %d, z_size: %d'
 
237
                             % (estimator._uncompressed_size_added,
 
238
                                estimator._compressed_size_added))
 
239
                next_revs = set()
 
240
                break
 
241
            # don't query things we've already queried
 
242
            next_revs = next_revs.difference(queried_revs)
 
243
            first_loop_done = True
 
244
        return result
 
245
 
 
246
    def _do_repository_request(self, body_bytes):
 
247
        repository = self._repository
 
248
        revision_ids = set(self._revision_ids)
 
249
        include_missing = 'include-missing:' in revision_ids
 
250
        if include_missing:
 
251
            revision_ids.remove('include-missing:')
 
252
        body_lines = body_bytes.split('\n')
 
253
        search_result, error = self.recreate_search_from_recipe(
 
254
            repository, body_lines)
 
255
        if error is not None:
 
256
            return error
 
257
        # TODO might be nice to start up the search again; but thats not
 
258
        # written or tested yet.
 
259
        client_seen_revs = set(search_result.get_keys())
 
260
        # Always include the requested ids.
 
261
        client_seen_revs.difference_update(revision_ids)
 
262
 
 
263
        repo_graph = repository.get_graph()
 
264
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
265
                                             client_seen_revs, include_missing)
 
266
 
 
267
        # sorting trivially puts lexographically similar revision ids together.
 
268
        # Compression FTW.
 
269
        lines = []
 
270
        for revision, parents in sorted(result.items()):
 
271
            lines.append(' '.join((revision, ) + tuple(parents)))
 
272
 
 
273
        return SuccessfulSmartServerResponse(
 
274
            ('ok', ), bz2.compress('\n'.join(lines)))
 
275
 
 
276
 
 
277
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
278
 
 
279
    def do_readlocked_repository_request(self, repository, revision_id):
 
280
        """Return the result of repository.get_revision_graph(revision_id).
 
281
 
 
282
        Deprecated as of bzr 1.4, but supported for older clients.
 
283
 
 
284
        :param repository: The repository to query in.
 
285
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
286
        :return: A smart server response where the body contains an utf8
 
287
            encoded flattened list of the revision graph.
 
288
        """
 
289
        if not revision_id:
 
290
            revision_id = None
 
291
 
 
292
        lines = []
 
293
        graph = repository.get_graph()
 
294
        if revision_id:
 
295
            search_ids = [revision_id]
 
296
        else:
 
297
            search_ids = repository.all_revision_ids()
 
298
        search = graph._make_breadth_first_searcher(search_ids)
 
299
        transitive_ids = set()
 
300
        map(transitive_ids.update, list(search))
 
301
        parent_map = graph.get_parent_map(transitive_ids)
 
302
        revision_graph = _strip_NULL_ghosts(parent_map)
 
303
        if revision_id and revision_id not in revision_graph:
 
304
            # Note that we return an empty body, rather than omitting the body.
 
305
            # This way the client knows that it can always expect to find a body
 
306
            # in the response for this method, even in the error case.
 
307
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
308
 
 
309
        for revision, parents in revision_graph.items():
 
310
            lines.append(' '.join((revision, ) + tuple(parents)))
 
311
 
 
312
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
313
 
 
314
 
 
315
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
 
316
 
 
317
    def do_readlocked_repository_request(self, repository, revno,
 
318
            known_pair):
 
319
        """Find the revid for a given revno, given a known revno/revid pair.
 
320
        
 
321
        New in 1.17.
 
322
        """
 
323
        try:
 
324
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
 
325
        except errors.RevisionNotPresent, err:
 
326
            if err.revision_id != known_pair[1]:
 
327
                raise AssertionError(
 
328
                    'get_rev_id_for_revno raised RevisionNotPresent for '
 
329
                    'non-initial revision: ' + err.revision_id)
 
330
            return FailedSmartServerResponse(
 
331
                ('nosuchrevision', err.revision_id))
 
332
        if found_flag:
 
333
            return SuccessfulSmartServerResponse(('ok', result))
 
334
        else:
 
335
            earliest_revno, earliest_revid = result
 
336
            return SuccessfulSmartServerResponse(
 
337
                ('history-incomplete', earliest_revno, earliest_revid))
 
338
 
 
339
 
 
340
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
341
 
 
342
    def do_repository_request(self, repository):
 
343
        """Return the serializer format for this repository.
 
344
 
 
345
        New in 2.5.0.
 
346
 
 
347
        :param repository: The repository to query
 
348
        :return: A smart server response ('ok', FORMAT)
 
349
        """
 
350
        serializer = repository.get_serializer_format()
 
351
        return SuccessfulSmartServerResponse(('ok', serializer))
 
352
 
 
353
 
 
354
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
355
 
 
356
    def do_repository_request(self, repository, revision_id):
 
357
        """Return ok if a specific revision is in the repository at path.
 
358
 
 
359
        :param repository: The repository to query in.
 
360
        :param revision_id: The utf8 encoded revision_id to lookup.
 
361
        :return: A smart server response of ('yes', ) if the revision is
 
362
            present. ('no', ) if it is missing.
 
363
        """
 
364
        if repository.has_revision(revision_id):
 
365
            return SuccessfulSmartServerResponse(('yes', ))
 
366
        else:
 
367
            return SuccessfulSmartServerResponse(('no', ))
 
368
 
 
369
 
 
370
class SmartServerRequestHasSignatureForRevisionId(
 
371
        SmartServerRepositoryRequest):
 
372
 
 
373
    def do_repository_request(self, repository, revision_id):
 
374
        """Return ok if a signature is present for a revision.
 
375
 
 
376
        Introduced in bzr 2.5.0.
 
377
 
 
378
        :param repository: The repository to query in.
 
379
        :param revision_id: The utf8 encoded revision_id to lookup.
 
380
        :return: A smart server response of ('yes', ) if a
 
381
            signature for the revision is present,
 
382
            ('no', ) if it is missing.
 
383
        """
 
384
        try:
 
385
            if repository.has_signature_for_revision_id(revision_id):
 
386
                return SuccessfulSmartServerResponse(('yes', ))
 
387
            else:
 
388
                return SuccessfulSmartServerResponse(('no', ))
 
389
        except errors.NoSuchRevision:
 
390
            return FailedSmartServerResponse(
 
391
                ('nosuchrevision', revision_id))
 
392
 
 
393
 
 
394
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
395
 
 
396
    def do_repository_request(self, repository, revid, committers):
 
397
        """Return the result of repository.gather_stats().
 
398
 
 
399
        :param repository: The repository to query in.
 
400
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
401
        :param committers: 'yes' or 'no'.
 
402
 
 
403
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
404
              committers: 1
 
405
              firstrev: 1234.230 0
 
406
              latestrev: 345.700 3600
 
407
              revisions: 2
 
408
 
 
409
              But containing only fields returned by the gather_stats() call
 
410
        """
 
411
        if revid == '':
 
412
            decoded_revision_id = None
 
413
        else:
 
414
            decoded_revision_id = revid
 
415
        if committers == 'yes':
 
416
            decoded_committers = True
 
417
        else:
 
418
            decoded_committers = None
 
419
        try:
 
420
            stats = repository.gather_stats(decoded_revision_id,
 
421
                decoded_committers)
 
422
        except errors.NoSuchRevision:
 
423
            return FailedSmartServerResponse(('nosuchrevision', revid))
 
424
 
 
425
        body = ''
 
426
        if stats.has_key('committers'):
 
427
            body += 'committers: %d\n' % stats['committers']
 
428
        if stats.has_key('firstrev'):
 
429
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
430
        if stats.has_key('latestrev'):
 
431
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
432
        if stats.has_key('revisions'):
 
433
            body += 'revisions: %d\n' % stats['revisions']
 
434
        if stats.has_key('size'):
 
435
            body += 'size: %d\n' % stats['size']
 
436
 
 
437
        return SuccessfulSmartServerResponse(('ok', ), body)
 
438
 
 
439
 
 
440
class SmartServerRepositoryGetRevisionSignatureText(
 
441
        SmartServerRepositoryRequest):
 
442
    """Return the signature text of a revision.
 
443
 
 
444
    New in 2.5.
 
445
    """
 
446
 
 
447
    def do_repository_request(self, repository, revision_id):
 
448
        """Return the result of repository.get_signature_text().
 
449
 
 
450
        :param repository: The repository to query in.
 
451
        :return: A smart server response of with the signature text as
 
452
            body.
 
453
        """
 
454
        try:
 
455
            text = repository.get_signature_text(revision_id)
 
456
        except errors.NoSuchRevision, err:
 
457
            return FailedSmartServerResponse(
 
458
                ('nosuchrevision', err.revision))
 
459
        return SuccessfulSmartServerResponse(('ok', ), text)
 
460
 
 
461
 
 
462
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
463
 
 
464
    def do_repository_request(self, repository):
 
465
        """Return the result of repository.is_shared().
 
466
 
 
467
        :param repository: The repository to query in.
 
468
        :return: A smart server response of ('yes', ) if the repository is
 
469
            shared, and ('no', ) if it is not.
 
470
        """
 
471
        if repository.is_shared():
 
472
            return SuccessfulSmartServerResponse(('yes', ))
 
473
        else:
 
474
            return SuccessfulSmartServerResponse(('no', ))
 
475
 
 
476
 
 
477
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
478
 
 
479
    def do_repository_request(self, repository):
 
480
        """Return the result of repository.make_working_trees().
 
481
 
 
482
        Introduced in bzr 2.5.0.
 
483
 
 
484
        :param repository: The repository to query in.
 
485
        :return: A smart server response of ('yes', ) if the repository uses
 
486
            working trees, and ('no', ) if it is not.
 
487
        """
 
488
        if repository.make_working_trees():
 
489
            return SuccessfulSmartServerResponse(('yes', ))
 
490
        else:
 
491
            return SuccessfulSmartServerResponse(('no', ))
 
492
 
 
493
 
 
494
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
495
 
 
496
    def do_repository_request(self, repository, token=''):
 
497
        # XXX: this probably should not have a token.
 
498
        if token == '':
 
499
            token = None
 
500
        try:
 
501
            token = repository.lock_write(token=token).repository_token
 
502
        except errors.LockContention, e:
 
503
            return FailedSmartServerResponse(('LockContention',))
 
504
        except errors.UnlockableTransport:
 
505
            return FailedSmartServerResponse(('UnlockableTransport',))
 
506
        except errors.LockFailed, e:
 
507
            return FailedSmartServerResponse(('LockFailed',
 
508
                str(e.lock), str(e.why)))
 
509
        if token is not None:
 
510
            repository.leave_lock_in_place()
 
511
        repository.unlock()
 
512
        if token is None:
 
513
            token = ''
 
514
        return SuccessfulSmartServerResponse(('ok', token))
 
515
 
 
516
 
 
517
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
518
 
 
519
    def do_repository_request(self, repository, to_network_name):
 
520
        """Get a stream for inserting into a to_format repository.
 
521
 
 
522
        The request body is 'search_bytes', a description of the revisions
 
523
        being requested.
 
524
 
 
525
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
526
        implementations will respond with a BadSearch error, and clients should
 
527
        catch this and fallback appropriately.
 
528
 
 
529
        :param repository: The repository to stream from.
 
530
        :param to_network_name: The network name of the format of the target
 
531
            repository.
 
532
        """
 
533
        self._to_format = network_format_registry.get(to_network_name)
 
534
        if self._should_fake_unknown():
 
535
            return FailedSmartServerResponse(
 
536
                ('UnknownMethod', 'Repository.get_stream'))
 
537
        return None # Signal that we want a body.
 
538
 
 
539
    def _should_fake_unknown(self):
 
540
        """Return True if we should return UnknownMethod to the client.
 
541
        
 
542
        This is a workaround for bugs in pre-1.19 clients that claim to
 
543
        support receiving streams of CHK repositories.  The pre-1.19 client
 
544
        expects inventory records to be serialized in the format defined by
 
545
        to_network_name, but in pre-1.19 (at least) that format definition
 
546
        tries to use the xml5 serializer, which does not correctly handle
 
547
        rich-roots.  After 1.19 the client can also accept inventory-deltas
 
548
        (which avoids this issue), and those clients will use the
 
549
        Repository.get_stream_1.19 verb instead of this one.
 
550
        So: if this repository is CHK, and the to_format doesn't match,
 
551
        we should just fake an UnknownSmartMethod error so that the client
 
552
        will fallback to VFS, rather than sending it a stream we know it
 
553
        cannot handle.
 
554
        """
 
555
        from_format = self._repository._format
 
556
        to_format = self._to_format
 
557
        if not from_format.supports_chks:
 
558
            # Source not CHK: that's ok
 
559
            return False
 
560
        if (to_format.supports_chks and
 
561
            from_format.repository_class is to_format.repository_class and
 
562
            from_format._serializer == to_format._serializer):
 
563
            # Source is CHK, but target matches: that's ok
 
564
            # (e.g. 2a->2a, or CHK2->2a)
 
565
            return False
 
566
        # Source is CHK, and target is not CHK or incompatible CHK.  We can't
 
567
        # generate a compatible stream.
 
568
        return True
 
569
 
 
570
    def do_body(self, body_bytes):
 
571
        repository = self._repository
 
572
        repository.lock_read()
 
573
        try:
 
574
            search_result, error = self.recreate_search(repository, body_bytes,
 
575
                discard_excess=True)
 
576
            if error is not None:
 
577
                repository.unlock()
 
578
                return error
 
579
            source = repository._get_source(self._to_format)
 
580
            stream = source.get_stream(search_result)
 
581
        except Exception:
 
582
            exc_info = sys.exc_info()
 
583
            try:
 
584
                # On non-error, unlocking is done by the body stream handler.
 
585
                repository.unlock()
 
586
            finally:
 
587
                raise exc_info[0], exc_info[1], exc_info[2]
 
588
        return SuccessfulSmartServerResponse(('ok',),
 
589
            body_stream=self.body_stream(stream, repository))
 
590
 
 
591
    def body_stream(self, stream, repository):
 
592
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
593
        try:
 
594
            for bytes in byte_stream:
 
595
                yield bytes
 
596
        except errors.RevisionNotPresent, e:
 
597
            # This shouldn't be able to happen, but as we don't buffer
 
598
            # everything it can in theory happen.
 
599
            repository.unlock()
 
600
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
601
        else:
 
602
            repository.unlock()
 
603
 
 
604
 
 
605
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
606
    """The same as Repository.get_stream, but will return stream CHK formats to
 
607
    clients.
 
608
 
 
609
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
610
    
 
611
    New in 1.19.
 
612
    """
 
613
 
 
614
    def _should_fake_unknown(self):
 
615
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
 
616
        return False
 
617
 
 
618
 
 
619
def _stream_to_byte_stream(stream, src_format):
 
620
    """Convert a record stream to a self delimited byte stream."""
 
621
    pack_writer = pack.ContainerSerialiser()
 
622
    yield pack_writer.begin()
 
623
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
624
    for substream_type, substream in stream:
 
625
        for record in substream:
 
626
            if record.storage_kind in ('chunked', 'fulltext'):
 
627
                serialised = record_to_fulltext_bytes(record)
 
628
            elif record.storage_kind == 'absent':
 
629
                raise ValueError("Absent factory for %s" % (record.key,))
 
630
            else:
 
631
                serialised = record.get_bytes_as(record.storage_kind)
 
632
            if serialised:
 
633
                # Some streams embed the whole stream into the wire
 
634
                # representation of the first record, which means that
 
635
                # later records have no wire representation: we skip them.
 
636
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
637
    yield pack_writer.end()
 
638
 
 
639
 
 
640
class _ByteStreamDecoder(object):
 
641
    """Helper for _byte_stream_to_stream.
 
642
 
 
643
    The expected usage of this class is via the function _byte_stream_to_stream
 
644
    which creates a _ByteStreamDecoder, pops off the stream format and then
 
645
    yields the output of record_stream(), the main entry point to
 
646
    _ByteStreamDecoder.
 
647
 
 
648
    Broadly this class has to unwrap two layers of iterators:
 
649
    (type, substream)
 
650
    (substream details)
 
651
 
 
652
    This is complicated by wishing to return type, iterator_for_type, but
 
653
    getting the data for iterator_for_type when we find out type: we can't
 
654
    simply pass a generator down to the NetworkRecordStream parser, instead
 
655
    we have a little local state to seed each NetworkRecordStream instance,
 
656
    and gather the type that we'll be yielding.
 
657
 
 
658
    :ivar byte_stream: The byte stream being decoded.
 
659
    :ivar stream_decoder: A pack parser used to decode the bytestream
 
660
    :ivar current_type: The current type, used to join adjacent records of the
 
661
        same type into a single stream.
 
662
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
 
663
    """
 
664
 
 
665
    def __init__(self, byte_stream, record_counter):
 
666
        """Create a _ByteStreamDecoder."""
 
667
        self.stream_decoder = pack.ContainerPushParser()
 
668
        self.current_type = None
 
669
        self.first_bytes = None
 
670
        self.byte_stream = byte_stream
 
671
        self._record_counter = record_counter
 
672
        self.key_count = 0
 
673
 
 
674
    def iter_stream_decoder(self):
 
675
        """Iterate the contents of the pack from stream_decoder."""
 
676
        # dequeue pending items
 
677
        for record in self.stream_decoder.read_pending_records():
 
678
            yield record
 
679
        # Pull bytes of the wire, decode them to records, yield those records.
 
680
        for bytes in self.byte_stream:
 
681
            self.stream_decoder.accept_bytes(bytes)
 
682
            for record in self.stream_decoder.read_pending_records():
 
683
                yield record
 
684
 
 
685
    def iter_substream_bytes(self):
 
686
        if self.first_bytes is not None:
 
687
            yield self.first_bytes
 
688
            # If we run out of pack records, single the outer layer to stop.
 
689
            self.first_bytes = None
 
690
        for record in self.iter_pack_records:
 
691
            record_names, record_bytes = record
 
692
            record_name, = record_names
 
693
            substream_type = record_name[0]
 
694
            if substream_type != self.current_type:
 
695
                # end of a substream, seed the next substream.
 
696
                self.current_type = substream_type
 
697
                self.first_bytes = record_bytes
 
698
                return
 
699
            yield record_bytes
 
700
 
 
701
    def record_stream(self):
 
702
        """Yield substream_type, substream from the byte stream."""
 
703
        def wrap_and_count(pb, rc, substream):
 
704
            """Yield records from stream while showing progress."""
 
705
            counter = 0
 
706
            if rc:
 
707
                if self.current_type != 'revisions' and self.key_count != 0:
 
708
                    # As we know the number of revisions now (in self.key_count)
 
709
                    # we can setup and use record_counter (rc).
 
710
                    if not rc.is_initialized():
 
711
                        rc.setup(self.key_count, self.key_count)
 
712
            for record in substream.read():
 
713
                if rc:
 
714
                    if rc.is_initialized() and counter == rc.STEP:
 
715
                        rc.increment(counter)
 
716
                        pb.update('Estimate', rc.current, rc.max)
 
717
                        counter = 0
 
718
                    if self.current_type == 'revisions':
 
719
                        # Total records is proportional to number of revs
 
720
                        # to fetch. With remote, we used self.key_count to
 
721
                        # track the number of revs. Once we have the revs
 
722
                        # counts in self.key_count, the progress bar changes
 
723
                        # from 'Estimating..' to 'Estimate' above.
 
724
                        self.key_count += 1
 
725
                        if counter == rc.STEP:
 
726
                            pb.update('Estimating..', self.key_count)
 
727
                            counter = 0
 
728
                counter += 1
 
729
                yield record
 
730
 
 
731
        self.seed_state()
 
732
        pb = ui.ui_factory.nested_progress_bar()
 
733
        rc = self._record_counter
 
734
        # Make and consume sub generators, one per substream type:
 
735
        while self.first_bytes is not None:
 
736
            substream = NetworkRecordStream(self.iter_substream_bytes())
 
737
            # after substream is fully consumed, self.current_type is set to
 
738
            # the next type, and self.first_bytes is set to the matching bytes.
 
739
            yield self.current_type, wrap_and_count(pb, rc, substream)
 
740
        if rc:
 
741
            pb.update('Done', rc.max, rc.max)
 
742
        pb.finished()
 
743
 
 
744
    def seed_state(self):
 
745
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
 
746
        # Set a single generator we can use to get data from the pack stream.
 
747
        self.iter_pack_records = self.iter_stream_decoder()
 
748
        # Seed the very first subiterator with content; after this each one
 
749
        # seeds the next.
 
750
        list(self.iter_substream_bytes())
 
751
 
 
752
 
 
753
def _byte_stream_to_stream(byte_stream, record_counter=None):
 
754
    """Convert a byte stream into a format and a stream.
 
755
 
 
756
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
757
    :return: (RepositoryFormat, stream_generator)
 
758
    """
 
759
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
 
760
    for bytes in byte_stream:
 
761
        decoder.stream_decoder.accept_bytes(bytes)
 
762
        for record in decoder.stream_decoder.read_pending_records(max=1):
 
763
            record_names, src_format_name = record
 
764
            src_format = network_format_registry.get(src_format_name)
 
765
            return src_format, decoder.record_stream()
 
766
 
 
767
 
 
768
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
769
 
 
770
    def do_repository_request(self, repository, token):
 
771
        try:
 
772
            repository.lock_write(token=token)
 
773
        except errors.TokenMismatch, e:
 
774
            return FailedSmartServerResponse(('TokenMismatch',))
 
775
        repository.dont_leave_lock_in_place()
 
776
        repository.unlock()
 
777
        return SuccessfulSmartServerResponse(('ok',))
 
778
 
 
779
 
 
780
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
781
    """Get the physical lock status for a repository.
 
782
 
 
783
    New in 2.5.
 
784
    """
 
785
 
 
786
    def do_repository_request(self, repository):
 
787
        if repository.get_physical_lock_status():
 
788
            return SuccessfulSmartServerResponse(('yes', ))
 
789
        else:
 
790
            return SuccessfulSmartServerResponse(('no', ))
 
791
 
 
792
 
 
793
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
794
 
 
795
    def do_repository_request(self, repository, str_bool_new_value):
 
796
        if str_bool_new_value == 'True':
 
797
            new_value = True
 
798
        else:
 
799
            new_value = False
 
800
        repository.set_make_working_trees(new_value)
 
801
        return SuccessfulSmartServerResponse(('ok',))
 
802
 
 
803
 
 
804
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
805
    """Get the raw repository files as a tarball.
 
806
 
 
807
    The returned tarball contains a .bzr control directory which in turn
 
808
    contains a repository.
 
809
 
 
810
    This takes one parameter, compression, which currently must be
 
811
    "", "gz", or "bz2".
 
812
 
 
813
    This is used to implement the Repository.copy_content_into operation.
 
814
    """
 
815
 
 
816
    def do_repository_request(self, repository, compression):
 
817
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
818
        try:
 
819
            controldir_name = tmp_dirname + '/.bzr'
 
820
            return self._tarfile_response(controldir_name, compression)
 
821
        finally:
 
822
            osutils.rmtree(tmp_dirname)
 
823
 
 
824
    def _copy_to_tempdir(self, from_repo):
 
825
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
826
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
827
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
828
        from_repo.copy_content_into(tmp_repo)
 
829
        return tmp_dirname, tmp_repo
 
830
 
 
831
    def _tarfile_response(self, tmp_dirname, compression):
 
832
        temp = tempfile.NamedTemporaryFile()
 
833
        try:
 
834
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
835
            # all finished; write the tempfile out to the network
 
836
            temp.seek(0)
 
837
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
838
            # FIXME: Don't read the whole thing into memory here; rather stream
 
839
            # it out from the file onto the network. mbp 20070411
 
840
        finally:
 
841
            temp.close()
 
842
 
 
843
    def _tarball_of_dir(self, dirname, compression, ofile):
 
844
        import tarfile
 
845
        filename = os.path.basename(ofile.name)
 
846
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
847
            mode='w|' + compression)
 
848
        try:
 
849
            # The tarball module only accepts ascii names, and (i guess)
 
850
            # packs them with their 8bit names.  We know all the files
 
851
            # within the repository have ASCII names so the should be safe
 
852
            # to pack in.
 
853
            dirname = dirname.encode(sys.getfilesystemencoding())
 
854
            # python's tarball module includes the whole path by default so
 
855
            # override it
 
856
            if not dirname.endswith('.bzr'):
 
857
                raise ValueError(dirname)
 
858
            tarball.add(dirname, '.bzr') # recursive by default
 
859
        finally:
 
860
            tarball.close()
 
861
 
 
862
 
 
863
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
864
    """Insert a record stream from a RemoteSink into a repository.
 
865
 
 
866
    This gets bytes pushed to it by the network infrastructure and turns that
 
867
    into a bytes iterator using a thread. That is then processed by
 
868
    _byte_stream_to_stream.
 
869
 
 
870
    New in 1.14.
 
871
    """
 
872
 
 
873
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
874
        """StreamSink.insert_stream for a remote repository."""
 
875
        repository.lock_write(token=lock_token)
 
876
        self.do_insert_stream_request(repository, resume_tokens)
 
877
 
 
878
    def do_insert_stream_request(self, repository, resume_tokens):
 
879
        tokens = [token for token in resume_tokens.split(' ') if token]
 
880
        self.tokens = tokens
 
881
        self.repository = repository
 
882
        self.queue = Queue.Queue()
 
883
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
884
        self.insert_thread.start()
 
885
 
 
886
    def do_chunk(self, body_stream_chunk):
 
887
        self.queue.put(body_stream_chunk)
 
888
 
 
889
    def _inserter_thread(self):
 
890
        try:
 
891
            src_format, stream = _byte_stream_to_stream(
 
892
                self.blocking_byte_stream())
 
893
            self.insert_result = self.repository._get_sink().insert_stream(
 
894
                stream, src_format, self.tokens)
 
895
            self.insert_ok = True
 
896
        except:
 
897
            self.insert_exception = sys.exc_info()
 
898
            self.insert_ok = False
 
899
 
 
900
    def blocking_byte_stream(self):
 
901
        while True:
 
902
            bytes = self.queue.get()
 
903
            if bytes is StopIteration:
 
904
                return
 
905
            else:
 
906
                yield bytes
 
907
 
 
908
    def do_end(self):
 
909
        self.queue.put(StopIteration)
 
910
        if self.insert_thread is not None:
 
911
            self.insert_thread.join()
 
912
        if not self.insert_ok:
 
913
            exc_info = self.insert_exception
 
914
            raise exc_info[0], exc_info[1], exc_info[2]
 
915
        write_group_tokens, missing_keys = self.insert_result
 
916
        if write_group_tokens or missing_keys:
 
917
            # bzip needed? missing keys should typically be a small set.
 
918
            # Should this be a streaming body response ?
 
919
            missing_keys = sorted(missing_keys)
 
920
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
921
            self.repository.unlock()
 
922
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
923
        else:
 
924
            self.repository.unlock()
 
925
            return SuccessfulSmartServerResponse(('ok', ))
 
926
 
 
927
 
 
928
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
 
929
    """Insert a record stream from a RemoteSink into a repository.
 
930
 
 
931
    Same as SmartServerRepositoryInsertStreamLocked, except:
 
932
     - the lock token argument is optional
 
933
     - servers that implement this verb accept 'inventory-delta' records in the
 
934
       stream.
 
935
 
 
936
    New in 1.19.
 
937
    """
 
938
 
 
939
    def do_repository_request(self, repository, resume_tokens, lock_token=None):
 
940
        """StreamSink.insert_stream for a remote repository."""
 
941
        SmartServerRepositoryInsertStreamLocked.do_repository_request(
 
942
            self, repository, resume_tokens, lock_token)
 
943
 
 
944
 
 
945
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
946
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
947
 
 
948
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
949
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
950
    like pack format) repository.
 
951
 
 
952
    New in 1.13.
 
953
    """
 
954
 
 
955
    def do_repository_request(self, repository, resume_tokens):
 
956
        """StreamSink.insert_stream for a remote repository."""
 
957
        repository.lock_write()
 
958
        self.do_insert_stream_request(repository, resume_tokens)
 
959
 
 
960
 
 
961
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
962
    """Add a revision signature text.
 
963
 
 
964
    New in 2.5.
 
965
    """
 
966
 
 
967
    def do_repository_request(self, repository, lock_token, revision_id,
 
968
            *write_group_tokens):
 
969
        """Add a revision signature text.
 
970
 
 
971
        :param repository: Repository to operate on
 
972
        :param lock_token: Lock token
 
973
        :param revision_id: Revision for which to add signature
 
974
        :param write_group_tokens: Write group tokens
 
975
        """
 
976
        self._lock_token = lock_token
 
977
        self._revision_id = revision_id
 
978
        self._write_group_tokens = write_group_tokens
 
979
        return None
 
980
 
 
981
    def do_body(self, body_bytes):
 
982
        """Add a signature text.
 
983
 
 
984
        :param body_bytes: GPG signature text
 
985
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
986
            the list of new write group tokens.
 
987
        """
 
988
        self._repository.lock_write(token=self._lock_token)
 
989
        try:
 
990
            self._repository.resume_write_group(self._write_group_tokens)
 
991
            try:
 
992
                self._repository.add_signature_text(self._revision_id,
 
993
                    body_bytes)
 
994
            finally:
 
995
                new_write_group_tokens = self._repository.suspend_write_group()
 
996
        finally:
 
997
            self._repository.unlock()
 
998
        return SuccessfulSmartServerResponse(
 
999
            ('ok', ) + tuple(new_write_group_tokens))
 
1000
 
 
1001
 
 
1002
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1003
    """Start a write group.
 
1004
 
 
1005
    New in 2.5.
 
1006
    """
 
1007
 
 
1008
    def do_repository_request(self, repository, lock_token):
 
1009
        """Start a write group."""
 
1010
        repository.lock_write(token=lock_token)
 
1011
        try:
 
1012
            repository.start_write_group()
 
1013
            try:
 
1014
                tokens = repository.suspend_write_group()
 
1015
            except errors.UnsuspendableWriteGroup:
 
1016
                return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
 
1017
        finally:
 
1018
            repository.unlock()
 
1019
        return SuccessfulSmartServerResponse(('ok', tokens))
 
1020
 
 
1021
 
 
1022
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1023
    """Commit a write group.
 
1024
 
 
1025
    New in 2.5.
 
1026
    """
 
1027
 
 
1028
    def do_repository_request(self, repository, lock_token,
 
1029
            write_group_tokens):
 
1030
        """Commit a write group."""
 
1031
        repository.lock_write(token=lock_token)
 
1032
        try:
 
1033
            try:
 
1034
                repository.resume_write_group(write_group_tokens)
 
1035
            except errors.UnresumableWriteGroup, e:
 
1036
                return FailedSmartServerResponse(
 
1037
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1038
            try:
 
1039
                repository.commit_write_group()
 
1040
            except:
 
1041
                write_group_tokens = repository.suspend_write_group()
 
1042
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1043
                # have changed?
 
1044
                raise
 
1045
        finally:
 
1046
            repository.unlock()
 
1047
        return SuccessfulSmartServerResponse(('ok', ))
 
1048
 
 
1049
 
 
1050
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1051
    """Abort a write group.
 
1052
 
 
1053
    New in 2.5.
 
1054
    """
 
1055
 
 
1056
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1057
        """Abort a write group."""
 
1058
        repository.lock_write(token=lock_token)
 
1059
        try:
 
1060
            try:
 
1061
                repository.resume_write_group(write_group_tokens)
 
1062
            except errors.UnresumableWriteGroup, e:
 
1063
                return FailedSmartServerResponse(
 
1064
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1065
                repository.abort_write_group()
 
1066
        finally:
 
1067
            repository.unlock()
 
1068
        return SuccessfulSmartServerResponse(('ok', ))
 
1069
 
 
1070
 
 
1071
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1072
    """Check that a write group is still valid.
 
1073
 
 
1074
    New in 2.5.
 
1075
    """
 
1076
 
 
1077
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1078
        """Abort a write group."""
 
1079
        repository.lock_write(token=lock_token)
 
1080
        try:
 
1081
            try:
 
1082
                repository.resume_write_group(write_group_tokens)
 
1083
            except errors.UnresumableWriteGroup, e:
 
1084
                return FailedSmartServerResponse(
 
1085
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1086
            else:
 
1087
                repository.suspend_write_group()
 
1088
        finally:
 
1089
            repository.unlock()
 
1090
        return SuccessfulSmartServerResponse(('ok', ))
 
1091
 
 
1092
 
 
1093
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1094
    """Retrieve all of the revision ids in a repository.
 
1095
 
 
1096
    New in 2.5.
 
1097
    """
 
1098
 
 
1099
    def do_repository_request(self, repository):
 
1100
        revids = repository.all_revision_ids()
 
1101
        return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
 
1102
 
 
1103
 
 
1104
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1105
    """Pack a repository.
 
1106
 
 
1107
    New in 2.5.
 
1108
    """
 
1109
 
 
1110
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1111
        self._repository = repository
 
1112
        self._lock_token = lock_token
 
1113
        if clean_obsolete_packs == 'True':
 
1114
            self._clean_obsolete_packs = True
 
1115
        else:
 
1116
            self._clean_obsolete_packs = False
 
1117
        return None
 
1118
 
 
1119
    def do_body(self, body_bytes):
 
1120
        if body_bytes == "":
 
1121
            hint = None
 
1122
        else:
 
1123
            hint = body_bytes.splitlines()
 
1124
        self._repository.lock_write(token=self._lock_token)
 
1125
        try:
 
1126
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1127
        finally:
 
1128
            self._repository.unlock()
 
1129
        return SuccessfulSmartServerResponse(("ok", ), )
 
1130
 
 
1131
 
 
1132
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1133
    """Iterate over the contents of files.
 
1134
 
 
1135
    The client sends a list of desired files to stream, one
 
1136
    per line, and as tuples of file id and revision, separated by
 
1137
    \0.
 
1138
 
 
1139
    The server replies with a stream. Each entry is preceded by a header,
 
1140
    which can either be:
 
1141
 
 
1142
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1143
        list sent by the client. This header is followed by the contents of
 
1144
        the file, bzip2-compressed.
 
1145
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1146
        The client can then raise an appropriate RevisionNotPresent error
 
1147
        or check its fallback repositories.
 
1148
 
 
1149
    New in 2.5.
 
1150
    """
 
1151
 
 
1152
    def body_stream(self, repository, desired_files):
 
1153
        self._repository.lock_read()
 
1154
        try:
 
1155
            text_keys = {}
 
1156
            for i, key in enumerate(desired_files):
 
1157
                text_keys[key] = i
 
1158
            for record in repository.texts.get_record_stream(text_keys,
 
1159
                    'unordered', True):
 
1160
                identifier = text_keys[record.key]
 
1161
                if record.storage_kind == 'absent':
 
1162
                    yield "absent\0%s\0%s\0%d\n" % (record.key[0],
 
1163
                        record.key[1], identifier)
 
1164
                    # FIXME: Way to abort early?
 
1165
                    continue
 
1166
                yield "ok\0%d\n" % identifier
 
1167
                compressor = zlib.compressobj()
 
1168
                for bytes in record.get_bytes_as('chunked'):
 
1169
                    data = compressor.compress(bytes)
 
1170
                    if data:
 
1171
                        yield data
 
1172
                data = compressor.flush()
 
1173
                if data:
 
1174
                    yield data
 
1175
        finally:
 
1176
            self._repository.unlock()
 
1177
 
 
1178
    def do_body(self, body_bytes):
 
1179
        desired_files = [
 
1180
            tuple(l.split("\0")) for l in body_bytes.splitlines()]
 
1181
        return SuccessfulSmartServerResponse(('ok', ),
 
1182
            body_stream=self.body_stream(self._repository, desired_files))
 
1183
 
 
1184
    def do_repository_request(self, repository):
 
1185
        # Signal that we want a body
 
1186
        return None
 
1187
 
 
1188
 
 
1189
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1190
    """Stream a list of revisions.
 
1191
 
 
1192
    The client sends a list of newline-separated revision ids in the
 
1193
    body of the request and the server replies with the serializer format,
 
1194
    and a stream of bzip2-compressed revision texts (using the specified
 
1195
    serializer format).
 
1196
 
 
1197
    Any revisions the server does not have are omitted from the stream.
 
1198
 
 
1199
    New in 2.5.
 
1200
    """
 
1201
 
 
1202
    def do_repository_request(self, repository):
 
1203
        self._repository = repository
 
1204
        # Signal there is a body
 
1205
        return None
 
1206
 
 
1207
    def do_body(self, body_bytes):
 
1208
        revision_ids = body_bytes.split("\n")
 
1209
        return SuccessfulSmartServerResponse(
 
1210
            ('ok', self._repository.get_serializer_format()),
 
1211
            body_stream=self.body_stream(self._repository, revision_ids))
 
1212
 
 
1213
    def body_stream(self, repository, revision_ids):
 
1214
        self._repository.lock_read()
 
1215
        try:
 
1216
            for record in repository.revisions.get_record_stream(
 
1217
                [(revid,) for revid in revision_ids], 'unordered', True):
 
1218
                if record.storage_kind == 'absent':
 
1219
                    continue
 
1220
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1221
        finally:
 
1222
            self._repository.unlock()