~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Patch Queue Manager
  • Date: 2011-11-29 15:26:50 UTC
  • mfrom: (6325.2.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20111129152650-pjga2r7h97qes8dh
(vila) Fix various typos (Vincent Ladeuil)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2006-2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
"""Server-side repository related request implmentations."""
 
18
 
 
19
import bz2
 
20
import os
 
21
import Queue
 
22
import sys
 
23
import tempfile
 
24
import threading
 
25
import zlib
 
26
 
 
27
from bzrlib import (
 
28
    bencode,
 
29
    errors,
 
30
    estimate_compressed_size,
 
31
    graph,
 
32
    osutils,
 
33
    pack,
 
34
    trace,
 
35
    ui,
 
36
    )
 
37
from bzrlib.bzrdir import BzrDir
 
38
from bzrlib.smart.request import (
 
39
    FailedSmartServerResponse,
 
40
    SmartServerRequest,
 
41
    SuccessfulSmartServerResponse,
 
42
    )
 
43
from bzrlib.repository import _strip_NULL_ghosts, network_format_registry
 
44
from bzrlib import revision as _mod_revision
 
45
from bzrlib.versionedfile import (
 
46
    NetworkRecordStream,
 
47
    record_to_fulltext_bytes,
 
48
    )
 
49
 
 
50
 
 
51
class SmartServerRepositoryRequest(SmartServerRequest):
 
52
    """Common base class for Repository requests."""
 
53
 
 
54
    def do(self, path, *args):
 
55
        """Execute a repository request.
 
56
 
 
57
        All Repository requests take a path to the repository as their first
 
58
        argument.  The repository must be at the exact path given by the
 
59
        client - no searching is done.
 
60
 
 
61
        The actual logic is delegated to self.do_repository_request.
 
62
 
 
63
        :param client_path: The path for the repository as received from the
 
64
            client.
 
65
        :return: A SmartServerResponse from self.do_repository_request().
 
66
        """
 
67
        transport = self.transport_from_client_path(path)
 
68
        bzrdir = BzrDir.open_from_transport(transport)
 
69
        # Save the repository for use with do_body.
 
70
        self._repository = bzrdir.open_repository()
 
71
        return self.do_repository_request(self._repository, *args)
 
72
 
 
73
    def do_repository_request(self, repository, *args):
 
74
        """Override to provide an implementation for a verb."""
 
75
        # No-op for verbs that take bodies (None as a result indicates a body
 
76
        # is expected)
 
77
        return None
 
78
 
 
79
    def recreate_search(self, repository, search_bytes, discard_excess=False):
 
80
        """Recreate a search from its serialised form.
 
81
 
 
82
        :param discard_excess: If True, and the search refers to data we don't
 
83
            have, just silently accept that fact - the verb calling
 
84
            recreate_search trusts that clients will look for missing things
 
85
            they expected and get it from elsewhere.
 
86
        """
 
87
        if search_bytes == 'everything':
 
88
            return graph.EverythingResult(repository), None
 
89
        lines = search_bytes.split('\n')
 
90
        if lines[0] == 'ancestry-of':
 
91
            heads = lines[1:]
 
92
            search_result = graph.PendingAncestryResult(heads, repository)
 
93
            return search_result, None
 
94
        elif lines[0] == 'search':
 
95
            return self.recreate_search_from_recipe(repository, lines[1:],
 
96
                discard_excess=discard_excess)
 
97
        else:
 
98
            return (None, FailedSmartServerResponse(('BadSearch',)))
 
99
 
 
100
    def recreate_search_from_recipe(self, repository, lines,
 
101
        discard_excess=False):
 
102
        """Recreate a specific revision search (vs a from-tip search).
 
103
 
 
104
        :param discard_excess: If True, and the search refers to data we don't
 
105
            have, just silently accept that fact - the verb calling
 
106
            recreate_search trusts that clients will look for missing things
 
107
            they expected and get it from elsewhere.
 
108
        """
 
109
        start_keys = set(lines[0].split(' '))
 
110
        exclude_keys = set(lines[1].split(' '))
 
111
        revision_count = int(lines[2])
 
112
        repository.lock_read()
 
113
        try:
 
114
            search = repository.get_graph()._make_breadth_first_searcher(
 
115
                start_keys)
 
116
            while True:
 
117
                try:
 
118
                    next_revs = search.next()
 
119
                except StopIteration:
 
120
                    break
 
121
                search.stop_searching_any(exclude_keys.intersection(next_revs))
 
122
            search_result = search.get_result()
 
123
            if (not discard_excess and
 
124
                search_result.get_recipe()[3] != revision_count):
 
125
                # we got back a different amount of data than expected, this
 
126
                # gets reported as NoSuchRevision, because less revisions
 
127
                # indicates missing revisions, and more should never happen as
 
128
                # the excludes list considers ghosts and ensures that ghost
 
129
                # filling races are not a problem.
 
130
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
131
            return (search_result, None)
 
132
        finally:
 
133
            repository.unlock()
 
134
 
 
135
 
 
136
class SmartServerRepositoryReadLocked(SmartServerRepositoryRequest):
 
137
    """Calls self.do_readlocked_repository_request."""
 
138
 
 
139
    def do_repository_request(self, repository, *args):
 
140
        """Read lock a repository for do_readlocked_repository_request."""
 
141
        repository.lock_read()
 
142
        try:
 
143
            return self.do_readlocked_repository_request(repository, *args)
 
144
        finally:
 
145
            repository.unlock()
 
146
 
 
147
 
 
148
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
149
    """Break a repository lock."""
 
150
 
 
151
    def do_repository_request(self, repository):
 
152
        repository.break_lock()
 
153
        return SuccessfulSmartServerResponse(('ok', ))
 
154
 
 
155
 
 
156
_lsprof_count = 0
 
157
 
 
158
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
 
159
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
 
160
 
 
161
    no_extra_results = False
 
162
 
 
163
    def do_repository_request(self, repository, *revision_ids):
 
164
        """Get parent details for some revisions.
 
165
 
 
166
        All the parents for revision_ids are returned. Additionally up to 64KB
 
167
        of additional parent data found by performing a breadth first search
 
168
        from revision_ids is returned. The verb takes a body containing the
 
169
        current search state, see do_body for details.
 
170
 
 
171
        If 'include-missing:' is in revision_ids, ghosts encountered in the
 
172
        graph traversal for getting parent data are included in the result with
 
173
        a prefix of 'missing:'.
 
174
 
 
175
        :param repository: The repository to query in.
 
176
        :param revision_ids: The utf8 encoded revision_id to answer for.
 
177
        """
 
178
        self._revision_ids = revision_ids
 
179
        return None # Signal that we want a body.
 
180
 
 
181
    def do_body(self, body_bytes):
 
182
        """Process the current search state and perform the parent lookup.
 
183
 
 
184
        :return: A smart server response where the body contains an utf8
 
185
            encoded flattened list of the parents of the revisions (the same
 
186
            format as Repository.get_revision_graph) which has been bz2
 
187
            compressed.
 
188
        """
 
189
        repository = self._repository
 
190
        repository.lock_read()
 
191
        try:
 
192
            return self._do_repository_request(body_bytes)
 
193
        finally:
 
194
            repository.unlock()
 
195
 
 
196
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
197
                               include_missing, max_size=65536):
 
198
        result = {}
 
199
        queried_revs = set()
 
200
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
 
201
        next_revs = revision_ids
 
202
        first_loop_done = False
 
203
        while next_revs:
 
204
            queried_revs.update(next_revs)
 
205
            parent_map = repo_graph.get_parent_map(next_revs)
 
206
            current_revs = next_revs
 
207
            next_revs = set()
 
208
            for revision_id in current_revs:
 
209
                missing_rev = False
 
210
                parents = parent_map.get(revision_id)
 
211
                if parents is not None:
 
212
                    # adjust for the wire
 
213
                    if parents == (_mod_revision.NULL_REVISION,):
 
214
                        parents = ()
 
215
                    # prepare the next query
 
216
                    next_revs.update(parents)
 
217
                    encoded_id = revision_id
 
218
                else:
 
219
                    missing_rev = True
 
220
                    encoded_id = "missing:" + revision_id
 
221
                    parents = []
 
222
                if (revision_id not in client_seen_revs and
 
223
                    (not missing_rev or include_missing)):
 
224
                    # Client does not have this revision, give it to it.
 
225
                    # add parents to the result
 
226
                    result[encoded_id] = parents
 
227
                    # Approximate the serialized cost of this revision_id.
 
228
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
229
                    estimator.add_content(line)
 
230
            # get all the directly asked for parents, and then flesh out to
 
231
            # 64K (compressed) or so. We do one level of depth at a time to
 
232
            # stay in sync with the client. The 250000 magic number is
 
233
            # estimated compression ratio taken from bzr.dev itself.
 
234
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
235
                trace.mutter('size: %d, z_size: %d'
 
236
                             % (estimator._uncompressed_size_added,
 
237
                                estimator._compressed_size_added))
 
238
                next_revs = set()
 
239
                break
 
240
            # don't query things we've already queried
 
241
            next_revs = next_revs.difference(queried_revs)
 
242
            first_loop_done = True
 
243
        return result
 
244
 
 
245
    def _do_repository_request(self, body_bytes):
 
246
        repository = self._repository
 
247
        revision_ids = set(self._revision_ids)
 
248
        include_missing = 'include-missing:' in revision_ids
 
249
        if include_missing:
 
250
            revision_ids.remove('include-missing:')
 
251
        body_lines = body_bytes.split('\n')
 
252
        search_result, error = self.recreate_search_from_recipe(
 
253
            repository, body_lines)
 
254
        if error is not None:
 
255
            return error
 
256
        # TODO might be nice to start up the search again; but thats not
 
257
        # written or tested yet.
 
258
        client_seen_revs = set(search_result.get_keys())
 
259
        # Always include the requested ids.
 
260
        client_seen_revs.difference_update(revision_ids)
 
261
 
 
262
        repo_graph = repository.get_graph()
 
263
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
264
                                             client_seen_revs, include_missing)
 
265
 
 
266
        # sorting trivially puts lexographically similar revision ids together.
 
267
        # Compression FTW.
 
268
        lines = []
 
269
        for revision, parents in sorted(result.items()):
 
270
            lines.append(' '.join((revision, ) + tuple(parents)))
 
271
 
 
272
        return SuccessfulSmartServerResponse(
 
273
            ('ok', ), bz2.compress('\n'.join(lines)))
 
274
 
 
275
 
 
276
class SmartServerRepositoryGetRevisionGraph(SmartServerRepositoryReadLocked):
 
277
 
 
278
    def do_readlocked_repository_request(self, repository, revision_id):
 
279
        """Return the result of repository.get_revision_graph(revision_id).
 
280
 
 
281
        Deprecated as of bzr 1.4, but supported for older clients.
 
282
 
 
283
        :param repository: The repository to query in.
 
284
        :param revision_id: The utf8 encoded revision_id to get a graph from.
 
285
        :return: A smart server response where the body contains an utf8
 
286
            encoded flattened list of the revision graph.
 
287
        """
 
288
        if not revision_id:
 
289
            revision_id = None
 
290
 
 
291
        lines = []
 
292
        graph = repository.get_graph()
 
293
        if revision_id:
 
294
            search_ids = [revision_id]
 
295
        else:
 
296
            search_ids = repository.all_revision_ids()
 
297
        search = graph._make_breadth_first_searcher(search_ids)
 
298
        transitive_ids = set()
 
299
        map(transitive_ids.update, list(search))
 
300
        parent_map = graph.get_parent_map(transitive_ids)
 
301
        revision_graph = _strip_NULL_ghosts(parent_map)
 
302
        if revision_id and revision_id not in revision_graph:
 
303
            # Note that we return an empty body, rather than omitting the body.
 
304
            # This way the client knows that it can always expect to find a body
 
305
            # in the response for this method, even in the error case.
 
306
            return FailedSmartServerResponse(('nosuchrevision', revision_id), '')
 
307
 
 
308
        for revision, parents in revision_graph.items():
 
309
            lines.append(' '.join((revision, ) + tuple(parents)))
 
310
 
 
311
        return SuccessfulSmartServerResponse(('ok', ), '\n'.join(lines))
 
312
 
 
313
 
 
314
class SmartServerRepositoryGetRevIdForRevno(SmartServerRepositoryReadLocked):
 
315
 
 
316
    def do_readlocked_repository_request(self, repository, revno,
 
317
            known_pair):
 
318
        """Find the revid for a given revno, given a known revno/revid pair.
 
319
        
 
320
        New in 1.17.
 
321
        """
 
322
        try:
 
323
            found_flag, result = repository.get_rev_id_for_revno(revno, known_pair)
 
324
        except errors.RevisionNotPresent, err:
 
325
            if err.revision_id != known_pair[1]:
 
326
                raise AssertionError(
 
327
                    'get_rev_id_for_revno raised RevisionNotPresent for '
 
328
                    'non-initial revision: ' + err.revision_id)
 
329
            return FailedSmartServerResponse(
 
330
                ('nosuchrevision', err.revision_id))
 
331
        if found_flag:
 
332
            return SuccessfulSmartServerResponse(('ok', result))
 
333
        else:
 
334
            earliest_revno, earliest_revid = result
 
335
            return SuccessfulSmartServerResponse(
 
336
                ('history-incomplete', earliest_revno, earliest_revid))
 
337
 
 
338
 
 
339
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
340
 
 
341
    def do_repository_request(self, repository):
 
342
        """Return the serializer format for this repository.
 
343
 
 
344
        New in 2.5.0.
 
345
 
 
346
        :param repository: The repository to query
 
347
        :return: A smart server response ('ok', FORMAT)
 
348
        """
 
349
        serializer = repository.get_serializer_format()
 
350
        return SuccessfulSmartServerResponse(('ok', serializer))
 
351
 
 
352
 
 
353
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
 
354
 
 
355
    def do_repository_request(self, repository, revision_id):
 
356
        """Return ok if a specific revision is in the repository at path.
 
357
 
 
358
        :param repository: The repository to query in.
 
359
        :param revision_id: The utf8 encoded revision_id to lookup.
 
360
        :return: A smart server response of ('yes', ) if the revision is
 
361
            present. ('no', ) if it is missing.
 
362
        """
 
363
        if repository.has_revision(revision_id):
 
364
            return SuccessfulSmartServerResponse(('yes', ))
 
365
        else:
 
366
            return SuccessfulSmartServerResponse(('no', ))
 
367
 
 
368
 
 
369
class SmartServerRequestHasSignatureForRevisionId(
 
370
        SmartServerRepositoryRequest):
 
371
 
 
372
    def do_repository_request(self, repository, revision_id):
 
373
        """Return ok if a signature is present for a revision.
 
374
 
 
375
        Introduced in bzr 2.5.0.
 
376
 
 
377
        :param repository: The repository to query in.
 
378
        :param revision_id: The utf8 encoded revision_id to lookup.
 
379
        :return: A smart server response of ('yes', ) if a
 
380
            signature for the revision is present,
 
381
            ('no', ) if it is missing.
 
382
        """
 
383
        try:
 
384
            if repository.has_signature_for_revision_id(revision_id):
 
385
                return SuccessfulSmartServerResponse(('yes', ))
 
386
            else:
 
387
                return SuccessfulSmartServerResponse(('no', ))
 
388
        except errors.NoSuchRevision:
 
389
            return FailedSmartServerResponse(
 
390
                ('nosuchrevision', revision_id))
 
391
 
 
392
 
 
393
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
 
394
 
 
395
    def do_repository_request(self, repository, revid, committers):
 
396
        """Return the result of repository.gather_stats().
 
397
 
 
398
        :param repository: The repository to query in.
 
399
        :param revid: utf8 encoded rev id or an empty string to indicate None
 
400
        :param committers: 'yes' or 'no'.
 
401
 
 
402
        :return: A SmartServerResponse ('ok',), a encoded body looking like
 
403
              committers: 1
 
404
              firstrev: 1234.230 0
 
405
              latestrev: 345.700 3600
 
406
              revisions: 2
 
407
 
 
408
              But containing only fields returned by the gather_stats() call
 
409
        """
 
410
        if revid == '':
 
411
            decoded_revision_id = None
 
412
        else:
 
413
            decoded_revision_id = revid
 
414
        if committers == 'yes':
 
415
            decoded_committers = True
 
416
        else:
 
417
            decoded_committers = None
 
418
        try:
 
419
            stats = repository.gather_stats(decoded_revision_id,
 
420
                decoded_committers)
 
421
        except errors.NoSuchRevision:
 
422
            return FailedSmartServerResponse(('nosuchrevision', revid))
 
423
 
 
424
        body = ''
 
425
        if stats.has_key('committers'):
 
426
            body += 'committers: %d\n' % stats['committers']
 
427
        if stats.has_key('firstrev'):
 
428
            body += 'firstrev: %.3f %d\n' % stats['firstrev']
 
429
        if stats.has_key('latestrev'):
 
430
             body += 'latestrev: %.3f %d\n' % stats['latestrev']
 
431
        if stats.has_key('revisions'):
 
432
            body += 'revisions: %d\n' % stats['revisions']
 
433
        if stats.has_key('size'):
 
434
            body += 'size: %d\n' % stats['size']
 
435
 
 
436
        return SuccessfulSmartServerResponse(('ok', ), body)
 
437
 
 
438
 
 
439
class SmartServerRepositoryGetRevisionSignatureText(
 
440
        SmartServerRepositoryRequest):
 
441
    """Return the signature text of a revision.
 
442
 
 
443
    New in 2.5.
 
444
    """
 
445
 
 
446
    def do_repository_request(self, repository, revision_id):
 
447
        """Return the result of repository.get_signature_text().
 
448
 
 
449
        :param repository: The repository to query in.
 
450
        :return: A smart server response of with the signature text as
 
451
            body.
 
452
        """
 
453
        try:
 
454
            text = repository.get_signature_text(revision_id)
 
455
        except errors.NoSuchRevision, err:
 
456
            return FailedSmartServerResponse(
 
457
                ('nosuchrevision', err.revision))
 
458
        return SuccessfulSmartServerResponse(('ok', ), text)
 
459
 
 
460
 
 
461
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
 
462
 
 
463
    def do_repository_request(self, repository):
 
464
        """Return the result of repository.is_shared().
 
465
 
 
466
        :param repository: The repository to query in.
 
467
        :return: A smart server response of ('yes', ) if the repository is
 
468
            shared, and ('no', ) if it is not.
 
469
        """
 
470
        if repository.is_shared():
 
471
            return SuccessfulSmartServerResponse(('yes', ))
 
472
        else:
 
473
            return SuccessfulSmartServerResponse(('no', ))
 
474
 
 
475
 
 
476
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
477
 
 
478
    def do_repository_request(self, repository):
 
479
        """Return the result of repository.make_working_trees().
 
480
 
 
481
        Introduced in bzr 2.5.0.
 
482
 
 
483
        :param repository: The repository to query in.
 
484
        :return: A smart server response of ('yes', ) if the repository uses
 
485
            working trees, and ('no', ) if it is not.
 
486
        """
 
487
        if repository.make_working_trees():
 
488
            return SuccessfulSmartServerResponse(('yes', ))
 
489
        else:
 
490
            return SuccessfulSmartServerResponse(('no', ))
 
491
 
 
492
 
 
493
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
 
494
 
 
495
    def do_repository_request(self, repository, token=''):
 
496
        # XXX: this probably should not have a token.
 
497
        if token == '':
 
498
            token = None
 
499
        try:
 
500
            token = repository.lock_write(token=token).repository_token
 
501
        except errors.LockContention, e:
 
502
            return FailedSmartServerResponse(('LockContention',))
 
503
        except errors.UnlockableTransport:
 
504
            return FailedSmartServerResponse(('UnlockableTransport',))
 
505
        except errors.LockFailed, e:
 
506
            return FailedSmartServerResponse(('LockFailed',
 
507
                str(e.lock), str(e.why)))
 
508
        if token is not None:
 
509
            repository.leave_lock_in_place()
 
510
        repository.unlock()
 
511
        if token is None:
 
512
            token = ''
 
513
        return SuccessfulSmartServerResponse(('ok', token))
 
514
 
 
515
 
 
516
class SmartServerRepositoryGetStream(SmartServerRepositoryRequest):
 
517
 
 
518
    def do_repository_request(self, repository, to_network_name):
 
519
        """Get a stream for inserting into a to_format repository.
 
520
 
 
521
        The request body is 'search_bytes', a description of the revisions
 
522
        being requested.
 
523
 
 
524
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
525
        implementations will respond with a BadSearch error, and clients should
 
526
        catch this and fallback appropriately.
 
527
 
 
528
        :param repository: The repository to stream from.
 
529
        :param to_network_name: The network name of the format of the target
 
530
            repository.
 
531
        """
 
532
        self._to_format = network_format_registry.get(to_network_name)
 
533
        if self._should_fake_unknown():
 
534
            return FailedSmartServerResponse(
 
535
                ('UnknownMethod', 'Repository.get_stream'))
 
536
        return None # Signal that we want a body.
 
537
 
 
538
    def _should_fake_unknown(self):
 
539
        """Return True if we should return UnknownMethod to the client.
 
540
        
 
541
        This is a workaround for bugs in pre-1.19 clients that claim to
 
542
        support receiving streams of CHK repositories.  The pre-1.19 client
 
543
        expects inventory records to be serialized in the format defined by
 
544
        to_network_name, but in pre-1.19 (at least) that format definition
 
545
        tries to use the xml5 serializer, which does not correctly handle
 
546
        rich-roots.  After 1.19 the client can also accept inventory-deltas
 
547
        (which avoids this issue), and those clients will use the
 
548
        Repository.get_stream_1.19 verb instead of this one.
 
549
        So: if this repository is CHK, and the to_format doesn't match,
 
550
        we should just fake an UnknownSmartMethod error so that the client
 
551
        will fallback to VFS, rather than sending it a stream we know it
 
552
        cannot handle.
 
553
        """
 
554
        from_format = self._repository._format
 
555
        to_format = self._to_format
 
556
        if not from_format.supports_chks:
 
557
            # Source not CHK: that's ok
 
558
            return False
 
559
        if (to_format.supports_chks and
 
560
            from_format.repository_class is to_format.repository_class and
 
561
            from_format._serializer == to_format._serializer):
 
562
            # Source is CHK, but target matches: that's ok
 
563
            # (e.g. 2a->2a, or CHK2->2a)
 
564
            return False
 
565
        # Source is CHK, and target is not CHK or incompatible CHK.  We can't
 
566
        # generate a compatible stream.
 
567
        return True
 
568
 
 
569
    def do_body(self, body_bytes):
 
570
        repository = self._repository
 
571
        repository.lock_read()
 
572
        try:
 
573
            search_result, error = self.recreate_search(repository, body_bytes,
 
574
                discard_excess=True)
 
575
            if error is not None:
 
576
                repository.unlock()
 
577
                return error
 
578
            source = repository._get_source(self._to_format)
 
579
            stream = source.get_stream(search_result)
 
580
        except Exception:
 
581
            exc_info = sys.exc_info()
 
582
            try:
 
583
                # On non-error, unlocking is done by the body stream handler.
 
584
                repository.unlock()
 
585
            finally:
 
586
                raise exc_info[0], exc_info[1], exc_info[2]
 
587
        return SuccessfulSmartServerResponse(('ok',),
 
588
            body_stream=self.body_stream(stream, repository))
 
589
 
 
590
    def body_stream(self, stream, repository):
 
591
        byte_stream = _stream_to_byte_stream(stream, repository._format)
 
592
        try:
 
593
            for bytes in byte_stream:
 
594
                yield bytes
 
595
        except errors.RevisionNotPresent, e:
 
596
            # This shouldn't be able to happen, but as we don't buffer
 
597
            # everything it can in theory happen.
 
598
            repository.unlock()
 
599
            yield FailedSmartServerResponse(('NoSuchRevision', e.revision_id))
 
600
        else:
 
601
            repository.unlock()
 
602
 
 
603
 
 
604
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
605
    """The same as Repository.get_stream, but will return stream CHK formats to
 
606
    clients.
 
607
 
 
608
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
609
    
 
610
    New in 1.19.
 
611
    """
 
612
 
 
613
    def _should_fake_unknown(self):
 
614
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
 
615
        return False
 
616
 
 
617
 
 
618
def _stream_to_byte_stream(stream, src_format):
 
619
    """Convert a record stream to a self delimited byte stream."""
 
620
    pack_writer = pack.ContainerSerialiser()
 
621
    yield pack_writer.begin()
 
622
    yield pack_writer.bytes_record(src_format.network_name(), '')
 
623
    for substream_type, substream in stream:
 
624
        for record in substream:
 
625
            if record.storage_kind in ('chunked', 'fulltext'):
 
626
                serialised = record_to_fulltext_bytes(record)
 
627
            elif record.storage_kind == 'absent':
 
628
                raise ValueError("Absent factory for %s" % (record.key,))
 
629
            else:
 
630
                serialised = record.get_bytes_as(record.storage_kind)
 
631
            if serialised:
 
632
                # Some streams embed the whole stream into the wire
 
633
                # representation of the first record, which means that
 
634
                # later records have no wire representation: we skip them.
 
635
                yield pack_writer.bytes_record(serialised, [(substream_type,)])
 
636
    yield pack_writer.end()
 
637
 
 
638
 
 
639
class _ByteStreamDecoder(object):
 
640
    """Helper for _byte_stream_to_stream.
 
641
 
 
642
    The expected usage of this class is via the function _byte_stream_to_stream
 
643
    which creates a _ByteStreamDecoder, pops off the stream format and then
 
644
    yields the output of record_stream(), the main entry point to
 
645
    _ByteStreamDecoder.
 
646
 
 
647
    Broadly this class has to unwrap two layers of iterators:
 
648
    (type, substream)
 
649
    (substream details)
 
650
 
 
651
    This is complicated by wishing to return type, iterator_for_type, but
 
652
    getting the data for iterator_for_type when we find out type: we can't
 
653
    simply pass a generator down to the NetworkRecordStream parser, instead
 
654
    we have a little local state to seed each NetworkRecordStream instance,
 
655
    and gather the type that we'll be yielding.
 
656
 
 
657
    :ivar byte_stream: The byte stream being decoded.
 
658
    :ivar stream_decoder: A pack parser used to decode the bytestream
 
659
    :ivar current_type: The current type, used to join adjacent records of the
 
660
        same type into a single stream.
 
661
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
 
662
    """
 
663
 
 
664
    def __init__(self, byte_stream, record_counter):
 
665
        """Create a _ByteStreamDecoder."""
 
666
        self.stream_decoder = pack.ContainerPushParser()
 
667
        self.current_type = None
 
668
        self.first_bytes = None
 
669
        self.byte_stream = byte_stream
 
670
        self._record_counter = record_counter
 
671
        self.key_count = 0
 
672
 
 
673
    def iter_stream_decoder(self):
 
674
        """Iterate the contents of the pack from stream_decoder."""
 
675
        # dequeue pending items
 
676
        for record in self.stream_decoder.read_pending_records():
 
677
            yield record
 
678
        # Pull bytes of the wire, decode them to records, yield those records.
 
679
        for bytes in self.byte_stream:
 
680
            self.stream_decoder.accept_bytes(bytes)
 
681
            for record in self.stream_decoder.read_pending_records():
 
682
                yield record
 
683
 
 
684
    def iter_substream_bytes(self):
 
685
        if self.first_bytes is not None:
 
686
            yield self.first_bytes
 
687
            # If we run out of pack records, single the outer layer to stop.
 
688
            self.first_bytes = None
 
689
        for record in self.iter_pack_records:
 
690
            record_names, record_bytes = record
 
691
            record_name, = record_names
 
692
            substream_type = record_name[0]
 
693
            if substream_type != self.current_type:
 
694
                # end of a substream, seed the next substream.
 
695
                self.current_type = substream_type
 
696
                self.first_bytes = record_bytes
 
697
                return
 
698
            yield record_bytes
 
699
 
 
700
    def record_stream(self):
 
701
        """Yield substream_type, substream from the byte stream."""
 
702
        def wrap_and_count(pb, rc, substream):
 
703
            """Yield records from stream while showing progress."""
 
704
            counter = 0
 
705
            if rc:
 
706
                if self.current_type != 'revisions' and self.key_count != 0:
 
707
                    # As we know the number of revisions now (in self.key_count)
 
708
                    # we can setup and use record_counter (rc).
 
709
                    if not rc.is_initialized():
 
710
                        rc.setup(self.key_count, self.key_count)
 
711
            for record in substream.read():
 
712
                if rc:
 
713
                    if rc.is_initialized() and counter == rc.STEP:
 
714
                        rc.increment(counter)
 
715
                        pb.update('Estimate', rc.current, rc.max)
 
716
                        counter = 0
 
717
                    if self.current_type == 'revisions':
 
718
                        # Total records is proportional to number of revs
 
719
                        # to fetch. With remote, we used self.key_count to
 
720
                        # track the number of revs. Once we have the revs
 
721
                        # counts in self.key_count, the progress bar changes
 
722
                        # from 'Estimating..' to 'Estimate' above.
 
723
                        self.key_count += 1
 
724
                        if counter == rc.STEP:
 
725
                            pb.update('Estimating..', self.key_count)
 
726
                            counter = 0
 
727
                counter += 1
 
728
                yield record
 
729
 
 
730
        self.seed_state()
 
731
        pb = ui.ui_factory.nested_progress_bar()
 
732
        rc = self._record_counter
 
733
        # Make and consume sub generators, one per substream type:
 
734
        while self.first_bytes is not None:
 
735
            substream = NetworkRecordStream(self.iter_substream_bytes())
 
736
            # after substream is fully consumed, self.current_type is set to
 
737
            # the next type, and self.first_bytes is set to the matching bytes.
 
738
            yield self.current_type, wrap_and_count(pb, rc, substream)
 
739
        if rc:
 
740
            pb.update('Done', rc.max, rc.max)
 
741
        pb.finished()
 
742
 
 
743
    def seed_state(self):
 
744
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
 
745
        # Set a single generator we can use to get data from the pack stream.
 
746
        self.iter_pack_records = self.iter_stream_decoder()
 
747
        # Seed the very first subiterator with content; after this each one
 
748
        # seeds the next.
 
749
        list(self.iter_substream_bytes())
 
750
 
 
751
 
 
752
def _byte_stream_to_stream(byte_stream, record_counter=None):
 
753
    """Convert a byte stream into a format and a stream.
 
754
 
 
755
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
 
756
    :return: (RepositoryFormat, stream_generator)
 
757
    """
 
758
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
 
759
    for bytes in byte_stream:
 
760
        decoder.stream_decoder.accept_bytes(bytes)
 
761
        for record in decoder.stream_decoder.read_pending_records(max=1):
 
762
            record_names, src_format_name = record
 
763
            src_format = network_format_registry.get(src_format_name)
 
764
            return src_format, decoder.record_stream()
 
765
 
 
766
 
 
767
class SmartServerRepositoryUnlock(SmartServerRepositoryRequest):
 
768
 
 
769
    def do_repository_request(self, repository, token):
 
770
        try:
 
771
            repository.lock_write(token=token)
 
772
        except errors.TokenMismatch, e:
 
773
            return FailedSmartServerResponse(('TokenMismatch',))
 
774
        repository.dont_leave_lock_in_place()
 
775
        repository.unlock()
 
776
        return SuccessfulSmartServerResponse(('ok',))
 
777
 
 
778
 
 
779
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
780
    """Get the physical lock status for a repository.
 
781
 
 
782
    New in 2.5.
 
783
    """
 
784
 
 
785
    def do_repository_request(self, repository):
 
786
        if repository.get_physical_lock_status():
 
787
            return SuccessfulSmartServerResponse(('yes', ))
 
788
        else:
 
789
            return SuccessfulSmartServerResponse(('no', ))
 
790
 
 
791
 
 
792
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
 
793
 
 
794
    def do_repository_request(self, repository, str_bool_new_value):
 
795
        if str_bool_new_value == 'True':
 
796
            new_value = True
 
797
        else:
 
798
            new_value = False
 
799
        repository.set_make_working_trees(new_value)
 
800
        return SuccessfulSmartServerResponse(('ok',))
 
801
 
 
802
 
 
803
class SmartServerRepositoryTarball(SmartServerRepositoryRequest):
 
804
    """Get the raw repository files as a tarball.
 
805
 
 
806
    The returned tarball contains a .bzr control directory which in turn
 
807
    contains a repository.
 
808
 
 
809
    This takes one parameter, compression, which currently must be
 
810
    "", "gz", or "bz2".
 
811
 
 
812
    This is used to implement the Repository.copy_content_into operation.
 
813
    """
 
814
 
 
815
    def do_repository_request(self, repository, compression):
 
816
        tmp_dirname, tmp_repo = self._copy_to_tempdir(repository)
 
817
        try:
 
818
            controldir_name = tmp_dirname + '/.bzr'
 
819
            return self._tarfile_response(controldir_name, compression)
 
820
        finally:
 
821
            osutils.rmtree(tmp_dirname)
 
822
 
 
823
    def _copy_to_tempdir(self, from_repo):
 
824
        tmp_dirname = osutils.mkdtemp(prefix='tmpbzrclone')
 
825
        tmp_bzrdir = from_repo.bzrdir._format.initialize(tmp_dirname)
 
826
        tmp_repo = from_repo._format.initialize(tmp_bzrdir)
 
827
        from_repo.copy_content_into(tmp_repo)
 
828
        return tmp_dirname, tmp_repo
 
829
 
 
830
    def _tarfile_response(self, tmp_dirname, compression):
 
831
        temp = tempfile.NamedTemporaryFile()
 
832
        try:
 
833
            self._tarball_of_dir(tmp_dirname, compression, temp.file)
 
834
            # all finished; write the tempfile out to the network
 
835
            temp.seek(0)
 
836
            return SuccessfulSmartServerResponse(('ok',), temp.read())
 
837
            # FIXME: Don't read the whole thing into memory here; rather stream
 
838
            # it out from the file onto the network. mbp 20070411
 
839
        finally:
 
840
            temp.close()
 
841
 
 
842
    def _tarball_of_dir(self, dirname, compression, ofile):
 
843
        import tarfile
 
844
        filename = os.path.basename(ofile.name)
 
845
        tarball = tarfile.open(fileobj=ofile, name=filename,
 
846
            mode='w|' + compression)
 
847
        try:
 
848
            # The tarball module only accepts ascii names, and (i guess)
 
849
            # packs them with their 8bit names.  We know all the files
 
850
            # within the repository have ASCII names so the should be safe
 
851
            # to pack in.
 
852
            dirname = dirname.encode(sys.getfilesystemencoding())
 
853
            # python's tarball module includes the whole path by default so
 
854
            # override it
 
855
            if not dirname.endswith('.bzr'):
 
856
                raise ValueError(dirname)
 
857
            tarball.add(dirname, '.bzr') # recursive by default
 
858
        finally:
 
859
            tarball.close()
 
860
 
 
861
 
 
862
class SmartServerRepositoryInsertStreamLocked(SmartServerRepositoryRequest):
 
863
    """Insert a record stream from a RemoteSink into a repository.
 
864
 
 
865
    This gets bytes pushed to it by the network infrastructure and turns that
 
866
    into a bytes iterator using a thread. That is then processed by
 
867
    _byte_stream_to_stream.
 
868
 
 
869
    New in 1.14.
 
870
    """
 
871
 
 
872
    def do_repository_request(self, repository, resume_tokens, lock_token):
 
873
        """StreamSink.insert_stream for a remote repository."""
 
874
        repository.lock_write(token=lock_token)
 
875
        self.do_insert_stream_request(repository, resume_tokens)
 
876
 
 
877
    def do_insert_stream_request(self, repository, resume_tokens):
 
878
        tokens = [token for token in resume_tokens.split(' ') if token]
 
879
        self.tokens = tokens
 
880
        self.repository = repository
 
881
        self.queue = Queue.Queue()
 
882
        self.insert_thread = threading.Thread(target=self._inserter_thread)
 
883
        self.insert_thread.start()
 
884
 
 
885
    def do_chunk(self, body_stream_chunk):
 
886
        self.queue.put(body_stream_chunk)
 
887
 
 
888
    def _inserter_thread(self):
 
889
        try:
 
890
            src_format, stream = _byte_stream_to_stream(
 
891
                self.blocking_byte_stream())
 
892
            self.insert_result = self.repository._get_sink().insert_stream(
 
893
                stream, src_format, self.tokens)
 
894
            self.insert_ok = True
 
895
        except:
 
896
            self.insert_exception = sys.exc_info()
 
897
            self.insert_ok = False
 
898
 
 
899
    def blocking_byte_stream(self):
 
900
        while True:
 
901
            bytes = self.queue.get()
 
902
            if bytes is StopIteration:
 
903
                return
 
904
            else:
 
905
                yield bytes
 
906
 
 
907
    def do_end(self):
 
908
        self.queue.put(StopIteration)
 
909
        if self.insert_thread is not None:
 
910
            self.insert_thread.join()
 
911
        if not self.insert_ok:
 
912
            exc_info = self.insert_exception
 
913
            raise exc_info[0], exc_info[1], exc_info[2]
 
914
        write_group_tokens, missing_keys = self.insert_result
 
915
        if write_group_tokens or missing_keys:
 
916
            # bzip needed? missing keys should typically be a small set.
 
917
            # Should this be a streaming body response ?
 
918
            missing_keys = sorted(missing_keys)
 
919
            bytes = bencode.bencode((write_group_tokens, missing_keys))
 
920
            self.repository.unlock()
 
921
            return SuccessfulSmartServerResponse(('missing-basis', bytes))
 
922
        else:
 
923
            self.repository.unlock()
 
924
            return SuccessfulSmartServerResponse(('ok', ))
 
925
 
 
926
 
 
927
class SmartServerRepositoryInsertStream_1_19(SmartServerRepositoryInsertStreamLocked):
 
928
    """Insert a record stream from a RemoteSink into a repository.
 
929
 
 
930
    Same as SmartServerRepositoryInsertStreamLocked, except:
 
931
     - the lock token argument is optional
 
932
     - servers that implement this verb accept 'inventory-delta' records in the
 
933
       stream.
 
934
 
 
935
    New in 1.19.
 
936
    """
 
937
 
 
938
    def do_repository_request(self, repository, resume_tokens, lock_token=None):
 
939
        """StreamSink.insert_stream for a remote repository."""
 
940
        SmartServerRepositoryInsertStreamLocked.do_repository_request(
 
941
            self, repository, resume_tokens, lock_token)
 
942
 
 
943
 
 
944
class SmartServerRepositoryInsertStream(SmartServerRepositoryInsertStreamLocked):
 
945
    """Insert a record stream from a RemoteSink into an unlocked repository.
 
946
 
 
947
    This is the same as SmartServerRepositoryInsertStreamLocked, except it
 
948
    takes no lock_tokens; i.e. it works with an unlocked (or lock-free, e.g.
 
949
    like pack format) repository.
 
950
 
 
951
    New in 1.13.
 
952
    """
 
953
 
 
954
    def do_repository_request(self, repository, resume_tokens):
 
955
        """StreamSink.insert_stream for a remote repository."""
 
956
        repository.lock_write()
 
957
        self.do_insert_stream_request(repository, resume_tokens)
 
958
 
 
959
 
 
960
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
961
    """Add a revision signature text.
 
962
 
 
963
    New in 2.5.
 
964
    """
 
965
 
 
966
    def do_repository_request(self, repository, lock_token, revision_id,
 
967
            *write_group_tokens):
 
968
        """Add a revision signature text.
 
969
 
 
970
        :param repository: Repository to operate on
 
971
        :param lock_token: Lock token
 
972
        :param revision_id: Revision for which to add signature
 
973
        :param write_group_tokens: Write group tokens
 
974
        """
 
975
        self._lock_token = lock_token
 
976
        self._revision_id = revision_id
 
977
        self._write_group_tokens = write_group_tokens
 
978
        return None
 
979
 
 
980
    def do_body(self, body_bytes):
 
981
        """Add a signature text.
 
982
 
 
983
        :param body_bytes: GPG signature text
 
984
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
985
            the list of new write group tokens.
 
986
        """
 
987
        self._repository.lock_write(token=self._lock_token)
 
988
        try:
 
989
            self._repository.resume_write_group(self._write_group_tokens)
 
990
            try:
 
991
                self._repository.add_signature_text(self._revision_id,
 
992
                    body_bytes)
 
993
            finally:
 
994
                new_write_group_tokens = self._repository.suspend_write_group()
 
995
        finally:
 
996
            self._repository.unlock()
 
997
        return SuccessfulSmartServerResponse(
 
998
            ('ok', ) + tuple(new_write_group_tokens))
 
999
 
 
1000
 
 
1001
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1002
    """Start a write group.
 
1003
 
 
1004
    New in 2.5.
 
1005
    """
 
1006
 
 
1007
    def do_repository_request(self, repository, lock_token):
 
1008
        """Start a write group."""
 
1009
        repository.lock_write(token=lock_token)
 
1010
        try:
 
1011
            repository.start_write_group()
 
1012
            try:
 
1013
                tokens = repository.suspend_write_group()
 
1014
            except errors.UnsuspendableWriteGroup:
 
1015
                return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
 
1016
        finally:
 
1017
            repository.unlock()
 
1018
        return SuccessfulSmartServerResponse(('ok', tokens))
 
1019
 
 
1020
 
 
1021
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1022
    """Commit a write group.
 
1023
 
 
1024
    New in 2.5.
 
1025
    """
 
1026
 
 
1027
    def do_repository_request(self, repository, lock_token,
 
1028
            write_group_tokens):
 
1029
        """Commit a write group."""
 
1030
        repository.lock_write(token=lock_token)
 
1031
        try:
 
1032
            try:
 
1033
                repository.resume_write_group(write_group_tokens)
 
1034
            except errors.UnresumableWriteGroup, e:
 
1035
                return FailedSmartServerResponse(
 
1036
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1037
            try:
 
1038
                repository.commit_write_group()
 
1039
            except:
 
1040
                write_group_tokens = repository.suspend_write_group()
 
1041
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1042
                # have changed?
 
1043
                raise
 
1044
        finally:
 
1045
            repository.unlock()
 
1046
        return SuccessfulSmartServerResponse(('ok', ))
 
1047
 
 
1048
 
 
1049
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1050
    """Abort a write group.
 
1051
 
 
1052
    New in 2.5.
 
1053
    """
 
1054
 
 
1055
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1056
        """Abort a write group."""
 
1057
        repository.lock_write(token=lock_token)
 
1058
        try:
 
1059
            try:
 
1060
                repository.resume_write_group(write_group_tokens)
 
1061
            except errors.UnresumableWriteGroup, e:
 
1062
                return FailedSmartServerResponse(
 
1063
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1064
                repository.abort_write_group()
 
1065
        finally:
 
1066
            repository.unlock()
 
1067
        return SuccessfulSmartServerResponse(('ok', ))
 
1068
 
 
1069
 
 
1070
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1071
    """Check that a write group is still valid.
 
1072
 
 
1073
    New in 2.5.
 
1074
    """
 
1075
 
 
1076
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1077
        """Abort a write group."""
 
1078
        repository.lock_write(token=lock_token)
 
1079
        try:
 
1080
            try:
 
1081
                repository.resume_write_group(write_group_tokens)
 
1082
            except errors.UnresumableWriteGroup, e:
 
1083
                return FailedSmartServerResponse(
 
1084
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1085
            else:
 
1086
                repository.suspend_write_group()
 
1087
        finally:
 
1088
            repository.unlock()
 
1089
        return SuccessfulSmartServerResponse(('ok', ))
 
1090
 
 
1091
 
 
1092
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1093
    """Retrieve all of the revision ids in a repository.
 
1094
 
 
1095
    New in 2.5.
 
1096
    """
 
1097
 
 
1098
    def do_repository_request(self, repository):
 
1099
        revids = repository.all_revision_ids()
 
1100
        return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
 
1101
 
 
1102
 
 
1103
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1104
    """Pack a repository.
 
1105
 
 
1106
    New in 2.5.
 
1107
    """
 
1108
 
 
1109
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1110
        self._repository = repository
 
1111
        self._lock_token = lock_token
 
1112
        if clean_obsolete_packs == 'True':
 
1113
            self._clean_obsolete_packs = True
 
1114
        else:
 
1115
            self._clean_obsolete_packs = False
 
1116
        return None
 
1117
 
 
1118
    def do_body(self, body_bytes):
 
1119
        if body_bytes == "":
 
1120
            hint = None
 
1121
        else:
 
1122
            hint = body_bytes.splitlines()
 
1123
        self._repository.lock_write(token=self._lock_token)
 
1124
        try:
 
1125
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1126
        finally:
 
1127
            self._repository.unlock()
 
1128
        return SuccessfulSmartServerResponse(("ok", ), )
 
1129
 
 
1130
 
 
1131
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1132
    """Iterate over the contents of files.
 
1133
 
 
1134
    The client sends a list of desired files to stream, one
 
1135
    per line, and as tuples of file id and revision, separated by
 
1136
    \0.
 
1137
 
 
1138
    The server replies with a stream. Each entry is preceded by a header,
 
1139
    which can either be:
 
1140
 
 
1141
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1142
        list sent by the client. This header is followed by the contents of
 
1143
        the file, bzip2-compressed.
 
1144
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1145
        The client can then raise an appropriate RevisionNotPresent error
 
1146
        or check its fallback repositories.
 
1147
 
 
1148
    New in 2.5.
 
1149
    """
 
1150
 
 
1151
    def body_stream(self, repository, desired_files):
 
1152
        self._repository.lock_read()
 
1153
        try:
 
1154
            text_keys = {}
 
1155
            for i, key in enumerate(desired_files):
 
1156
                text_keys[key] = i
 
1157
            for record in repository.texts.get_record_stream(text_keys,
 
1158
                    'unordered', True):
 
1159
                identifier = text_keys[record.key]
 
1160
                if record.storage_kind == 'absent':
 
1161
                    yield "absent\0%s\0%s\0%d\n" % (record.key[0],
 
1162
                        record.key[1], identifier)
 
1163
                    # FIXME: Way to abort early?
 
1164
                    continue
 
1165
                yield "ok\0%d\n" % identifier
 
1166
                compressor = zlib.compressobj()
 
1167
                for bytes in record.get_bytes_as('chunked'):
 
1168
                    data = compressor.compress(bytes)
 
1169
                    if data:
 
1170
                        yield data
 
1171
                data = compressor.flush()
 
1172
                if data:
 
1173
                    yield data
 
1174
        finally:
 
1175
            self._repository.unlock()
 
1176
 
 
1177
    def do_body(self, body_bytes):
 
1178
        desired_files = [
 
1179
            tuple(l.split("\0")) for l in body_bytes.splitlines()]
 
1180
        return SuccessfulSmartServerResponse(('ok', ),
 
1181
            body_stream=self.body_stream(self._repository, desired_files))
 
1182
 
 
1183
    def do_repository_request(self, repository):
 
1184
        # Signal that we want a body
 
1185
        return None
 
1186
 
 
1187
 
 
1188
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1189
    """Stream a list of revisions.
 
1190
 
 
1191
    The client sends a list of newline-separated revision ids in the
 
1192
    body of the request and the server replies with the serializer format,
 
1193
    and a stream of bzip2-compressed revision texts (using the specified
 
1194
    serializer format).
 
1195
 
 
1196
    Any revisions the server does not have are omitted from the stream.
 
1197
 
 
1198
    New in 2.5.
 
1199
    """
 
1200
 
 
1201
    def do_repository_request(self, repository):
 
1202
        self._repository = repository
 
1203
        # Signal there is a body
 
1204
        return None
 
1205
 
 
1206
    def do_body(self, body_bytes):
 
1207
        revision_ids = body_bytes.split("\n")
 
1208
        return SuccessfulSmartServerResponse(
 
1209
            ('ok', self._repository.get_serializer_format()),
 
1210
            body_stream=self.body_stream(self._repository, revision_ids))
 
1211
 
 
1212
    def body_stream(self, repository, revision_ids):
 
1213
        self._repository.lock_read()
 
1214
        try:
 
1215
            for record in repository.revisions.get_record_stream(
 
1216
                [(revid,) for revid in revision_ids], 'unordered', True):
 
1217
                if record.storage_kind == 'absent':
 
1218
                    continue
 
1219
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1220
        finally:
 
1221
            self._repository.unlock()