~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/repository.py

  • Committer: Martin Packman
  • Date: 2011-12-19 10:37:57 UTC
  • mto: This revision was merged to the branch mainline in revision 6394.
  • Revision ID: martin.packman@canonical.com-20111219103757-b85as9n9pb7e6qvn
Add tests for deprecated unicode wrapper functions in win32utils

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
import sys
23
23
import tempfile
24
24
import threading
 
25
import zlib
25
26
 
26
27
from bzrlib import (
27
28
    bencode,
28
29
    errors,
29
 
    graph,
 
30
    estimate_compressed_size,
30
31
    osutils,
31
32
    pack,
 
33
    trace,
32
34
    ui,
33
 
    versionedfile,
 
35
    vf_search,
34
36
    )
35
37
from bzrlib.bzrdir import BzrDir
36
38
from bzrlib.smart.request import (
82
84
            recreate_search trusts that clients will look for missing things
83
85
            they expected and get it from elsewhere.
84
86
        """
 
87
        if search_bytes == 'everything':
 
88
            return vf_search.EverythingResult(repository), None
85
89
        lines = search_bytes.split('\n')
86
90
        if lines[0] == 'ancestry-of':
87
91
            heads = lines[1:]
88
 
            search_result = graph.PendingAncestryResult(heads, repository)
 
92
            search_result = vf_search.PendingAncestryResult(heads, repository)
89
93
            return search_result, None
90
94
        elif lines[0] == 'search':
91
95
            return self.recreate_search_from_recipe(repository, lines[1:],
115
119
                except StopIteration:
116
120
                    break
117
121
                search.stop_searching_any(exclude_keys.intersection(next_revs))
118
 
            search_result = search.get_result()
119
 
            if (not discard_excess and
120
 
                search_result.get_recipe()[3] != revision_count):
 
122
            (started_keys, excludes, included_keys) = search.get_state()
 
123
            if (not discard_excess and len(included_keys) != revision_count):
121
124
                # we got back a different amount of data than expected, this
122
125
                # gets reported as NoSuchRevision, because less revisions
123
126
                # indicates missing revisions, and more should never happen as
124
127
                # the excludes list considers ghosts and ensures that ghost
125
128
                # filling races are not a problem.
126
129
                return (None, FailedSmartServerResponse(('NoSuchRevision',)))
 
130
            search_result = vf_search.SearchResult(started_keys, excludes,
 
131
                len(included_keys), included_keys)
127
132
            return (search_result, None)
128
133
        finally:
129
134
            repository.unlock()
141
146
            repository.unlock()
142
147
 
143
148
 
 
149
class SmartServerRepositoryBreakLock(SmartServerRepositoryRequest):
 
150
    """Break a repository lock."""
 
151
 
 
152
    def do_repository_request(self, repository):
 
153
        repository.break_lock()
 
154
        return SuccessfulSmartServerResponse(('ok', ))
 
155
 
 
156
 
 
157
_lsprof_count = 0
 
158
 
144
159
class SmartServerRepositoryGetParentMap(SmartServerRepositoryRequest):
145
160
    """Bzr 1.2+ - get parent data for revisions during a graph search."""
146
161
 
179
194
        finally:
180
195
            repository.unlock()
181
196
 
182
 
    def _do_repository_request(self, body_bytes):
183
 
        repository = self._repository
184
 
        revision_ids = set(self._revision_ids)
185
 
        include_missing = 'include-missing:' in revision_ids
186
 
        if include_missing:
187
 
            revision_ids.remove('include-missing:')
188
 
        body_lines = body_bytes.split('\n')
189
 
        search_result, error = self.recreate_search_from_recipe(
190
 
            repository, body_lines)
191
 
        if error is not None:
192
 
            return error
193
 
        # TODO might be nice to start up the search again; but thats not
194
 
        # written or tested yet.
195
 
        client_seen_revs = set(search_result.get_keys())
196
 
        # Always include the requested ids.
197
 
        client_seen_revs.difference_update(revision_ids)
198
 
        lines = []
199
 
        repo_graph = repository.get_graph()
 
197
    def _expand_requested_revs(self, repo_graph, revision_ids, client_seen_revs,
 
198
                               include_missing, max_size=65536):
200
199
        result = {}
201
200
        queried_revs = set()
202
 
        size_so_far = 0
 
201
        estimator = estimate_compressed_size.ZLibEstimator(max_size)
203
202
        next_revs = revision_ids
204
203
        first_loop_done = False
205
204
        while next_revs:
227
226
                    # add parents to the result
228
227
                    result[encoded_id] = parents
229
228
                    # Approximate the serialized cost of this revision_id.
230
 
                    size_so_far += 2 + len(encoded_id) + sum(map(len, parents))
 
229
                    line = '%s %s\n' % (encoded_id, ' '.join(parents))
 
230
                    estimator.add_content(line)
231
231
            # get all the directly asked for parents, and then flesh out to
232
232
            # 64K (compressed) or so. We do one level of depth at a time to
233
233
            # stay in sync with the client. The 250000 magic number is
234
234
            # estimated compression ratio taken from bzr.dev itself.
235
 
            if self.no_extra_results or (
236
 
                first_loop_done and size_so_far > 250000):
 
235
            if self.no_extra_results or (first_loop_done and estimator.full()):
 
236
                trace.mutter('size: %d, z_size: %d'
 
237
                             % (estimator._uncompressed_size_added,
 
238
                                estimator._compressed_size_added))
237
239
                next_revs = set()
238
240
                break
239
241
            # don't query things we've already queried
240
 
            next_revs.difference_update(queried_revs)
 
242
            next_revs = next_revs.difference(queried_revs)
241
243
            first_loop_done = True
 
244
        return result
 
245
 
 
246
    def _do_repository_request(self, body_bytes):
 
247
        repository = self._repository
 
248
        revision_ids = set(self._revision_ids)
 
249
        include_missing = 'include-missing:' in revision_ids
 
250
        if include_missing:
 
251
            revision_ids.remove('include-missing:')
 
252
        body_lines = body_bytes.split('\n')
 
253
        search_result, error = self.recreate_search_from_recipe(
 
254
            repository, body_lines)
 
255
        if error is not None:
 
256
            return error
 
257
        # TODO might be nice to start up the search again; but thats not
 
258
        # written or tested yet.
 
259
        client_seen_revs = set(search_result.get_keys())
 
260
        # Always include the requested ids.
 
261
        client_seen_revs.difference_update(revision_ids)
 
262
 
 
263
        repo_graph = repository.get_graph()
 
264
        result = self._expand_requested_revs(repo_graph, revision_ids,
 
265
                                             client_seen_revs, include_missing)
242
266
 
243
267
        # sorting trivially puts lexographically similar revision ids together.
244
268
        # Compression FTW.
 
269
        lines = []
245
270
        for revision, parents in sorted(result.items()):
246
271
            lines.append(' '.join((revision, ) + tuple(parents)))
247
272
 
312
337
                ('history-incomplete', earliest_revno, earliest_revid))
313
338
 
314
339
 
 
340
class SmartServerRepositoryGetSerializerFormat(SmartServerRepositoryRequest):
 
341
 
 
342
    def do_repository_request(self, repository):
 
343
        """Return the serializer format for this repository.
 
344
 
 
345
        New in 2.5.0.
 
346
 
 
347
        :param repository: The repository to query
 
348
        :return: A smart server response ('ok', FORMAT)
 
349
        """
 
350
        serializer = repository.get_serializer_format()
 
351
        return SuccessfulSmartServerResponse(('ok', serializer))
 
352
 
 
353
 
315
354
class SmartServerRequestHasRevision(SmartServerRepositoryRequest):
316
355
 
317
356
    def do_repository_request(self, repository, revision_id):
319
358
 
320
359
        :param repository: The repository to query in.
321
360
        :param revision_id: The utf8 encoded revision_id to lookup.
322
 
        :return: A smart server response of ('ok', ) if the revision is
323
 
            present.
 
361
        :return: A smart server response of ('yes', ) if the revision is
 
362
            present. ('no', ) if it is missing.
324
363
        """
325
364
        if repository.has_revision(revision_id):
326
365
            return SuccessfulSmartServerResponse(('yes', ))
328
367
            return SuccessfulSmartServerResponse(('no', ))
329
368
 
330
369
 
 
370
class SmartServerRequestHasSignatureForRevisionId(
 
371
        SmartServerRepositoryRequest):
 
372
 
 
373
    def do_repository_request(self, repository, revision_id):
 
374
        """Return ok if a signature is present for a revision.
 
375
 
 
376
        Introduced in bzr 2.5.0.
 
377
 
 
378
        :param repository: The repository to query in.
 
379
        :param revision_id: The utf8 encoded revision_id to lookup.
 
380
        :return: A smart server response of ('yes', ) if a
 
381
            signature for the revision is present,
 
382
            ('no', ) if it is missing.
 
383
        """
 
384
        try:
 
385
            if repository.has_signature_for_revision_id(revision_id):
 
386
                return SuccessfulSmartServerResponse(('yes', ))
 
387
            else:
 
388
                return SuccessfulSmartServerResponse(('no', ))
 
389
        except errors.NoSuchRevision:
 
390
            return FailedSmartServerResponse(
 
391
                ('nosuchrevision', revision_id))
 
392
 
 
393
 
331
394
class SmartServerRepositoryGatherStats(SmartServerRepositoryRequest):
332
395
 
333
396
    def do_repository_request(self, repository, revid, committers):
353
416
            decoded_committers = True
354
417
        else:
355
418
            decoded_committers = None
356
 
        stats = repository.gather_stats(decoded_revision_id, decoded_committers)
 
419
        try:
 
420
            stats = repository.gather_stats(decoded_revision_id,
 
421
                decoded_committers)
 
422
        except errors.NoSuchRevision:
 
423
            return FailedSmartServerResponse(('nosuchrevision', revid))
357
424
 
358
425
        body = ''
359
426
        if stats.has_key('committers'):
370
437
        return SuccessfulSmartServerResponse(('ok', ), body)
371
438
 
372
439
 
 
440
class SmartServerRepositoryGetRevisionSignatureText(
 
441
        SmartServerRepositoryRequest):
 
442
    """Return the signature text of a revision.
 
443
 
 
444
    New in 2.5.
 
445
    """
 
446
 
 
447
    def do_repository_request(self, repository, revision_id):
 
448
        """Return the result of repository.get_signature_text().
 
449
 
 
450
        :param repository: The repository to query in.
 
451
        :return: A smart server response of with the signature text as
 
452
            body.
 
453
        """
 
454
        try:
 
455
            text = repository.get_signature_text(revision_id)
 
456
        except errors.NoSuchRevision, err:
 
457
            return FailedSmartServerResponse(
 
458
                ('nosuchrevision', err.revision))
 
459
        return SuccessfulSmartServerResponse(('ok', ), text)
 
460
 
 
461
 
373
462
class SmartServerRepositoryIsShared(SmartServerRepositoryRequest):
374
463
 
375
464
    def do_repository_request(self, repository):
385
474
            return SuccessfulSmartServerResponse(('no', ))
386
475
 
387
476
 
 
477
class SmartServerRepositoryMakeWorkingTrees(SmartServerRepositoryRequest):
 
478
 
 
479
    def do_repository_request(self, repository):
 
480
        """Return the result of repository.make_working_trees().
 
481
 
 
482
        Introduced in bzr 2.5.0.
 
483
 
 
484
        :param repository: The repository to query in.
 
485
        :return: A smart server response of ('yes', ) if the repository uses
 
486
            working trees, and ('no', ) if it is not.
 
487
        """
 
488
        if repository.make_working_trees():
 
489
            return SuccessfulSmartServerResponse(('yes', ))
 
490
        else:
 
491
            return SuccessfulSmartServerResponse(('no', ))
 
492
 
 
493
 
388
494
class SmartServerRepositoryLockWrite(SmartServerRepositoryRequest):
389
495
 
390
496
    def do_repository_request(self, repository, token=''):
392
498
        if token == '':
393
499
            token = None
394
500
        try:
395
 
            token = repository.lock_write(token=token)
 
501
            token = repository.lock_write(token=token).repository_token
396
502
        except errors.LockContention, e:
397
503
            return FailedSmartServerResponse(('LockContention',))
398
504
        except errors.UnlockableTransport:
413
519
    def do_repository_request(self, repository, to_network_name):
414
520
        """Get a stream for inserting into a to_format repository.
415
521
 
 
522
        The request body is 'search_bytes', a description of the revisions
 
523
        being requested.
 
524
 
 
525
        In 2.3 this verb added support for search_bytes == 'everything'.  Older
 
526
        implementations will respond with a BadSearch error, and clients should
 
527
        catch this and fallback appropriately.
 
528
 
416
529
        :param repository: The repository to stream from.
417
530
        :param to_network_name: The network name of the format of the target
418
531
            repository.
490
603
 
491
604
 
492
605
class SmartServerRepositoryGetStream_1_19(SmartServerRepositoryGetStream):
 
606
    """The same as Repository.get_stream, but will return stream CHK formats to
 
607
    clients.
 
608
 
 
609
    See SmartServerRepositoryGetStream._should_fake_unknown.
 
610
    
 
611
    New in 1.19.
 
612
    """
493
613
 
494
614
    def _should_fake_unknown(self):
495
615
        """Returns False; we don't need to workaround bugs in 1.19+ clients."""
505
625
        for record in substream:
506
626
            if record.storage_kind in ('chunked', 'fulltext'):
507
627
                serialised = record_to_fulltext_bytes(record)
508
 
            elif record.storage_kind == 'inventory-delta':
509
 
                serialised = record_to_inventory_delta_bytes(record)
510
628
            elif record.storage_kind == 'absent':
511
629
                raise ValueError("Absent factory for %s" % (record.key,))
512
630
            else:
544
662
    :ivar first_bytes: The first bytes to give the next NetworkRecordStream.
545
663
    """
546
664
 
547
 
    def __init__(self, byte_stream):
 
665
    def __init__(self, byte_stream, record_counter):
548
666
        """Create a _ByteStreamDecoder."""
549
667
        self.stream_decoder = pack.ContainerPushParser()
550
668
        self.current_type = None
551
669
        self.first_bytes = None
552
670
        self.byte_stream = byte_stream
 
671
        self._record_counter = record_counter
 
672
        self.key_count = 0
553
673
 
554
674
    def iter_stream_decoder(self):
555
675
        """Iterate the contents of the pack from stream_decoder."""
580
700
 
581
701
    def record_stream(self):
582
702
        """Yield substream_type, substream from the byte stream."""
 
703
        def wrap_and_count(pb, rc, substream):
 
704
            """Yield records from stream while showing progress."""
 
705
            counter = 0
 
706
            if rc:
 
707
                if self.current_type != 'revisions' and self.key_count != 0:
 
708
                    # As we know the number of revisions now (in self.key_count)
 
709
                    # we can setup and use record_counter (rc).
 
710
                    if not rc.is_initialized():
 
711
                        rc.setup(self.key_count, self.key_count)
 
712
            for record in substream.read():
 
713
                if rc:
 
714
                    if rc.is_initialized() and counter == rc.STEP:
 
715
                        rc.increment(counter)
 
716
                        pb.update('Estimate', rc.current, rc.max)
 
717
                        counter = 0
 
718
                    if self.current_type == 'revisions':
 
719
                        # Total records is proportional to number of revs
 
720
                        # to fetch. With remote, we used self.key_count to
 
721
                        # track the number of revs. Once we have the revs
 
722
                        # counts in self.key_count, the progress bar changes
 
723
                        # from 'Estimating..' to 'Estimate' above.
 
724
                        self.key_count += 1
 
725
                        if counter == rc.STEP:
 
726
                            pb.update('Estimating..', self.key_count)
 
727
                            counter = 0
 
728
                counter += 1
 
729
                yield record
 
730
 
583
731
        self.seed_state()
 
732
        pb = ui.ui_factory.nested_progress_bar()
 
733
        rc = self._record_counter
584
734
        # Make and consume sub generators, one per substream type:
585
735
        while self.first_bytes is not None:
586
736
            substream = NetworkRecordStream(self.iter_substream_bytes())
587
737
            # after substream is fully consumed, self.current_type is set to
588
738
            # the next type, and self.first_bytes is set to the matching bytes.
589
 
            yield self.current_type, substream.read()
 
739
            yield self.current_type, wrap_and_count(pb, rc, substream)
 
740
        if rc:
 
741
            pb.update('Done', rc.max, rc.max)
 
742
        pb.finished()
590
743
 
591
744
    def seed_state(self):
592
745
        """Prepare the _ByteStreamDecoder to decode from the pack stream."""
597
750
        list(self.iter_substream_bytes())
598
751
 
599
752
 
600
 
def _byte_stream_to_stream(byte_stream):
 
753
def _byte_stream_to_stream(byte_stream, record_counter=None):
601
754
    """Convert a byte stream into a format and a stream.
602
755
 
603
756
    :param byte_stream: A bytes iterator, as output by _stream_to_byte_stream.
604
757
    :return: (RepositoryFormat, stream_generator)
605
758
    """
606
 
    decoder = _ByteStreamDecoder(byte_stream)
 
759
    decoder = _ByteStreamDecoder(byte_stream, record_counter)
607
760
    for bytes in byte_stream:
608
761
        decoder.stream_decoder.accept_bytes(bytes)
609
762
        for record in decoder.stream_decoder.read_pending_records(max=1):
624
777
        return SuccessfulSmartServerResponse(('ok',))
625
778
 
626
779
 
 
780
class SmartServerRepositoryGetPhysicalLockStatus(SmartServerRepositoryRequest):
 
781
    """Get the physical lock status for a repository.
 
782
 
 
783
    New in 2.5.
 
784
    """
 
785
 
 
786
    def do_repository_request(self, repository):
 
787
        if repository.get_physical_lock_status():
 
788
            return SuccessfulSmartServerResponse(('yes', ))
 
789
        else:
 
790
            return SuccessfulSmartServerResponse(('no', ))
 
791
 
 
792
 
627
793
class SmartServerRepositorySetMakeWorkingTrees(SmartServerRepositoryRequest):
628
794
 
629
795
    def do_repository_request(self, repository, str_bool_new_value):
792
958
        self.do_insert_stream_request(repository, resume_tokens)
793
959
 
794
960
 
 
961
class SmartServerRepositoryAddSignatureText(SmartServerRepositoryRequest):
 
962
    """Add a revision signature text.
 
963
 
 
964
    New in 2.5.
 
965
    """
 
966
 
 
967
    def do_repository_request(self, repository, lock_token, revision_id,
 
968
            *write_group_tokens):
 
969
        """Add a revision signature text.
 
970
 
 
971
        :param repository: Repository to operate on
 
972
        :param lock_token: Lock token
 
973
        :param revision_id: Revision for which to add signature
 
974
        :param write_group_tokens: Write group tokens
 
975
        """
 
976
        self._lock_token = lock_token
 
977
        self._revision_id = revision_id
 
978
        self._write_group_tokens = write_group_tokens
 
979
        return None
 
980
 
 
981
    def do_body(self, body_bytes):
 
982
        """Add a signature text.
 
983
 
 
984
        :param body_bytes: GPG signature text
 
985
        :return: SuccessfulSmartServerResponse with arguments 'ok' and
 
986
            the list of new write group tokens.
 
987
        """
 
988
        self._repository.lock_write(token=self._lock_token)
 
989
        try:
 
990
            self._repository.resume_write_group(self._write_group_tokens)
 
991
            try:
 
992
                self._repository.add_signature_text(self._revision_id,
 
993
                    body_bytes)
 
994
            finally:
 
995
                new_write_group_tokens = self._repository.suspend_write_group()
 
996
        finally:
 
997
            self._repository.unlock()
 
998
        return SuccessfulSmartServerResponse(
 
999
            ('ok', ) + tuple(new_write_group_tokens))
 
1000
 
 
1001
 
 
1002
class SmartServerRepositoryStartWriteGroup(SmartServerRepositoryRequest):
 
1003
    """Start a write group.
 
1004
 
 
1005
    New in 2.5.
 
1006
    """
 
1007
 
 
1008
    def do_repository_request(self, repository, lock_token):
 
1009
        """Start a write group."""
 
1010
        repository.lock_write(token=lock_token)
 
1011
        try:
 
1012
            repository.start_write_group()
 
1013
            try:
 
1014
                tokens = repository.suspend_write_group()
 
1015
            except errors.UnsuspendableWriteGroup:
 
1016
                return FailedSmartServerResponse(('UnsuspendableWriteGroup',))
 
1017
        finally:
 
1018
            repository.unlock()
 
1019
        return SuccessfulSmartServerResponse(('ok', tokens))
 
1020
 
 
1021
 
 
1022
class SmartServerRepositoryCommitWriteGroup(SmartServerRepositoryRequest):
 
1023
    """Commit a write group.
 
1024
 
 
1025
    New in 2.5.
 
1026
    """
 
1027
 
 
1028
    def do_repository_request(self, repository, lock_token,
 
1029
            write_group_tokens):
 
1030
        """Commit a write group."""
 
1031
        repository.lock_write(token=lock_token)
 
1032
        try:
 
1033
            try:
 
1034
                repository.resume_write_group(write_group_tokens)
 
1035
            except errors.UnresumableWriteGroup, e:
 
1036
                return FailedSmartServerResponse(
 
1037
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1038
            try:
 
1039
                repository.commit_write_group()
 
1040
            except:
 
1041
                write_group_tokens = repository.suspend_write_group()
 
1042
                # FIXME JRV 2011-11-19: What if the write_group_tokens
 
1043
                # have changed?
 
1044
                raise
 
1045
        finally:
 
1046
            repository.unlock()
 
1047
        return SuccessfulSmartServerResponse(('ok', ))
 
1048
 
 
1049
 
 
1050
class SmartServerRepositoryAbortWriteGroup(SmartServerRepositoryRequest):
 
1051
    """Abort a write group.
 
1052
 
 
1053
    New in 2.5.
 
1054
    """
 
1055
 
 
1056
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1057
        """Abort a write group."""
 
1058
        repository.lock_write(token=lock_token)
 
1059
        try:
 
1060
            try:
 
1061
                repository.resume_write_group(write_group_tokens)
 
1062
            except errors.UnresumableWriteGroup, e:
 
1063
                return FailedSmartServerResponse(
 
1064
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1065
                repository.abort_write_group()
 
1066
        finally:
 
1067
            repository.unlock()
 
1068
        return SuccessfulSmartServerResponse(('ok', ))
 
1069
 
 
1070
 
 
1071
class SmartServerRepositoryCheckWriteGroup(SmartServerRepositoryRequest):
 
1072
    """Check that a write group is still valid.
 
1073
 
 
1074
    New in 2.5.
 
1075
    """
 
1076
 
 
1077
    def do_repository_request(self, repository, lock_token, write_group_tokens):
 
1078
        """Abort a write group."""
 
1079
        repository.lock_write(token=lock_token)
 
1080
        try:
 
1081
            try:
 
1082
                repository.resume_write_group(write_group_tokens)
 
1083
            except errors.UnresumableWriteGroup, e:
 
1084
                return FailedSmartServerResponse(
 
1085
                    ('UnresumableWriteGroup', e.write_groups, e.reason))
 
1086
            else:
 
1087
                repository.suspend_write_group()
 
1088
        finally:
 
1089
            repository.unlock()
 
1090
        return SuccessfulSmartServerResponse(('ok', ))
 
1091
 
 
1092
 
 
1093
class SmartServerRepositoryAllRevisionIds(SmartServerRepositoryRequest):
 
1094
    """Retrieve all of the revision ids in a repository.
 
1095
 
 
1096
    New in 2.5.
 
1097
    """
 
1098
 
 
1099
    def do_repository_request(self, repository):
 
1100
        revids = repository.all_revision_ids()
 
1101
        return SuccessfulSmartServerResponse(("ok", ), "\n".join(revids))
 
1102
 
 
1103
 
 
1104
class SmartServerRepositoryReconcile(SmartServerRepositoryRequest):
 
1105
    """Reconcile a repository.
 
1106
 
 
1107
    New in 2.5.
 
1108
    """
 
1109
 
 
1110
    def do_repository_request(self, repository, lock_token):
 
1111
        try:
 
1112
            repository.lock_write(token=lock_token)
 
1113
        except errors.TokenLockingNotSupported, e:
 
1114
            return FailedSmartServerResponse(
 
1115
                ('TokenLockingNotSupported', ))
 
1116
        try:
 
1117
            reconciler = repository.reconcile()
 
1118
        finally:
 
1119
            repository.unlock()
 
1120
        body = [
 
1121
            "garbage_inventories: %d\n" % reconciler.garbage_inventories,
 
1122
            "inconsistent_parents: %d\n" % reconciler.inconsistent_parents,
 
1123
            ]
 
1124
        return SuccessfulSmartServerResponse(('ok', ), "".join(body))
 
1125
 
 
1126
 
 
1127
class SmartServerRepositoryPack(SmartServerRepositoryRequest):
 
1128
    """Pack a repository.
 
1129
 
 
1130
    New in 2.5.
 
1131
    """
 
1132
 
 
1133
    def do_repository_request(self, repository, lock_token, clean_obsolete_packs):
 
1134
        self._repository = repository
 
1135
        self._lock_token = lock_token
 
1136
        if clean_obsolete_packs == 'True':
 
1137
            self._clean_obsolete_packs = True
 
1138
        else:
 
1139
            self._clean_obsolete_packs = False
 
1140
        return None
 
1141
 
 
1142
    def do_body(self, body_bytes):
 
1143
        if body_bytes == "":
 
1144
            hint = None
 
1145
        else:
 
1146
            hint = body_bytes.splitlines()
 
1147
        self._repository.lock_write(token=self._lock_token)
 
1148
        try:
 
1149
            self._repository.pack(hint, self._clean_obsolete_packs)
 
1150
        finally:
 
1151
            self._repository.unlock()
 
1152
        return SuccessfulSmartServerResponse(("ok", ), )
 
1153
 
 
1154
 
 
1155
class SmartServerRepositoryIterFilesBytes(SmartServerRepositoryRequest):
 
1156
    """Iterate over the contents of files.
 
1157
 
 
1158
    The client sends a list of desired files to stream, one
 
1159
    per line, and as tuples of file id and revision, separated by
 
1160
    \0.
 
1161
 
 
1162
    The server replies with a stream. Each entry is preceded by a header,
 
1163
    which can either be:
 
1164
 
 
1165
    * "ok\x00IDX\n" where IDX is the index of the entry in the desired files
 
1166
        list sent by the client. This header is followed by the contents of
 
1167
        the file, bzip2-compressed.
 
1168
    * "absent\x00FILEID\x00REVISION\x00IDX" to indicate a text is missing.
 
1169
        The client can then raise an appropriate RevisionNotPresent error
 
1170
        or check its fallback repositories.
 
1171
 
 
1172
    New in 2.5.
 
1173
    """
 
1174
 
 
1175
    def body_stream(self, repository, desired_files):
 
1176
        self._repository.lock_read()
 
1177
        try:
 
1178
            text_keys = {}
 
1179
            for i, key in enumerate(desired_files):
 
1180
                text_keys[key] = i
 
1181
            for record in repository.texts.get_record_stream(text_keys,
 
1182
                    'unordered', True):
 
1183
                identifier = text_keys[record.key]
 
1184
                if record.storage_kind == 'absent':
 
1185
                    yield "absent\0%s\0%s\0%d\n" % (record.key[0],
 
1186
                        record.key[1], identifier)
 
1187
                    # FIXME: Way to abort early?
 
1188
                    continue
 
1189
                yield "ok\0%d\n" % identifier
 
1190
                compressor = zlib.compressobj()
 
1191
                for bytes in record.get_bytes_as('chunked'):
 
1192
                    data = compressor.compress(bytes)
 
1193
                    if data:
 
1194
                        yield data
 
1195
                data = compressor.flush()
 
1196
                if data:
 
1197
                    yield data
 
1198
        finally:
 
1199
            self._repository.unlock()
 
1200
 
 
1201
    def do_body(self, body_bytes):
 
1202
        desired_files = [
 
1203
            tuple(l.split("\0")) for l in body_bytes.splitlines()]
 
1204
        return SuccessfulSmartServerResponse(('ok', ),
 
1205
            body_stream=self.body_stream(self._repository, desired_files))
 
1206
 
 
1207
    def do_repository_request(self, repository):
 
1208
        # Signal that we want a body
 
1209
        return None
 
1210
 
 
1211
 
 
1212
class SmartServerRepositoryIterRevisions(SmartServerRepositoryRequest):
 
1213
    """Stream a list of revisions.
 
1214
 
 
1215
    The client sends a list of newline-separated revision ids in the
 
1216
    body of the request and the server replies with the serializer format,
 
1217
    and a stream of bzip2-compressed revision texts (using the specified
 
1218
    serializer format).
 
1219
 
 
1220
    Any revisions the server does not have are omitted from the stream.
 
1221
 
 
1222
    New in 2.5.
 
1223
    """
 
1224
 
 
1225
    def do_repository_request(self, repository):
 
1226
        self._repository = repository
 
1227
        # Signal there is a body
 
1228
        return None
 
1229
 
 
1230
    def do_body(self, body_bytes):
 
1231
        revision_ids = body_bytes.split("\n")
 
1232
        return SuccessfulSmartServerResponse(
 
1233
            ('ok', self._repository.get_serializer_format()),
 
1234
            body_stream=self.body_stream(self._repository, revision_ids))
 
1235
 
 
1236
    def body_stream(self, repository, revision_ids):
 
1237
        self._repository.lock_read()
 
1238
        try:
 
1239
            for record in repository.revisions.get_record_stream(
 
1240
                [(revid,) for revid in revision_ids], 'unordered', True):
 
1241
                if record.storage_kind == 'absent':
 
1242
                    continue
 
1243
                yield zlib.compress(record.get_bytes_as('fulltext'))
 
1244
        finally:
 
1245
            self._repository.unlock()