~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/gpg.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2008-03-16 16:58:03 UTC
  • mfrom: (3224.3.1 news-typo)
  • Revision ID: pqm@pqm.ubuntu.com-20080316165803-tisoc9mpob9z544o
(Matt Nordhoff) Trivial NEWS typo fix

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2011 Canonical Ltd
 
1
# Copyright (C) 2005 Canonical Ltd
2
2
#   Authors: Robert Collins <robert.collins@canonical.com>
3
3
#
4
4
# This program is free software; you can redistribute it and/or modify
13
13
#
14
14
# You should have received a copy of the GNU General Public License
15
15
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
17
 
18
18
"""GPG signing and checking logic."""
19
19
 
20
20
import os
21
21
import sys
22
 
from StringIO import StringIO
23
22
 
24
23
from bzrlib.lazy_import import lazy_import
25
24
lazy_import(globals(), """
31
30
    trace,
32
31
    ui,
33
32
    )
34
 
from bzrlib.i18n import (
35
 
    gettext, 
36
 
    ngettext,
37
 
    )
38
33
""")
39
34
 
40
 
#verification results
41
 
SIGNATURE_VALID = 0
42
 
SIGNATURE_KEY_MISSING = 1
43
 
SIGNATURE_NOT_VALID = 2
44
 
SIGNATURE_NOT_SIGNED = 3
45
 
SIGNATURE_EXPIRED = 4
46
 
 
47
35
 
48
36
class DisabledGPGStrategy(object):
49
37
    """A GPG Strategy that makes everything fail."""
50
38
 
51
 
    @staticmethod
52
 
    def verify_signatures_available():
53
 
        return True
54
 
 
55
39
    def __init__(self, ignored):
56
40
        """Real strategies take a configuration."""
57
41
 
58
42
    def sign(self, content):
59
43
        raise errors.SigningFailed('Signing is disabled.')
60
44
 
61
 
    def verify(self, content, testament):
62
 
        raise errors.SignatureVerificationFailed('Signature verification is \
63
 
disabled.')
64
 
 
65
 
    def set_acceptable_keys(self, command_line_input):
66
 
        pass
67
 
 
68
45
 
69
46
class LoopbackGPGStrategy(object):
70
 
    """A GPG Strategy that acts like 'cat' - data is just passed through.
71
 
    Used in tests.
72
 
    """
73
 
 
74
 
    @staticmethod
75
 
    def verify_signatures_available():
76
 
        return True
 
47
    """A GPG Strategy that acts like 'cat' - data is just passed through."""
77
48
 
78
49
    def __init__(self, ignored):
79
50
        """Real strategies take a configuration."""
82
53
        return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
83
54
                "-----END PSEUDO-SIGNED CONTENT-----\n")
84
55
 
85
 
    def verify(self, content, testament):
86
 
        return SIGNATURE_VALID, None
87
 
 
88
 
    def set_acceptable_keys(self, command_line_input):
89
 
        if command_line_input is not None:
90
 
            patterns = command_line_input.split(",")
91
 
            self.acceptable_keys = []
92
 
            for pattern in patterns:
93
 
                if pattern == "unknown":
94
 
                    pass
95
 
                else:
96
 
                    self.acceptable_keys.append(pattern)
97
 
 
98
 
    def do_verifications(self, revisions, repository):
99
 
        count = {SIGNATURE_VALID: 0,
100
 
                 SIGNATURE_KEY_MISSING: 0,
101
 
                 SIGNATURE_NOT_VALID: 0,
102
 
                 SIGNATURE_NOT_SIGNED: 0,
103
 
                 SIGNATURE_EXPIRED: 0}
104
 
        result = []
105
 
        all_verifiable = True
106
 
        for rev_id in revisions:
107
 
            verification_result, uid =\
108
 
                                repository.verify_revision(rev_id,self)
109
 
            result.append([rev_id, verification_result, uid])
110
 
            count[verification_result] += 1
111
 
            if verification_result != SIGNATURE_VALID:
112
 
                all_verifiable = False
113
 
        return (count, result, all_verifiable)
114
 
 
115
 
    def valid_commits_message(self, count):
116
 
        return gettext(u"{0} commits with valid signatures").format(
117
 
                                        count[SIGNATURE_VALID])            
118
 
 
119
 
    def unknown_key_message(self, count):
120
 
        return ngettext(u"{0} commit with unknown key",
121
 
                             u"{0} commits with unknown keys",
122
 
                             count[SIGNATURE_KEY_MISSING]).format(
123
 
                                        count[SIGNATURE_KEY_MISSING])
124
 
 
125
 
    def commit_not_valid_message(self, count):
126
 
        return ngettext(u"{0} commit not valid",
127
 
                             u"{0} commits not valid",
128
 
                             count[SIGNATURE_NOT_VALID]).format(
129
 
                                            count[SIGNATURE_NOT_VALID])
130
 
 
131
 
    def commit_not_signed_message(self, count):
132
 
        return ngettext(u"{0} commit not signed",
133
 
                             u"{0} commits not signed",
134
 
                             count[SIGNATURE_NOT_SIGNED]).format(
135
 
                                        count[SIGNATURE_NOT_SIGNED])
136
 
 
137
 
    def expired_commit_message(self, count):
138
 
        return ngettext(u"{0} commit with key now expired",
139
 
                        u"{0} commits with key now expired",
140
 
                        count[SIGNATURE_EXPIRED]).format(
141
 
                                        count[SIGNATURE_EXPIRED])
142
 
 
143
56
 
144
57
def _set_gpg_tty():
145
58
    tty = os.environ.get('TTY')
156
69
 
157
70
class GPGStrategy(object):
158
71
    """GPG Signing and checking facilities."""
159
 
 
160
 
    acceptable_keys = None
161
 
 
162
 
    @staticmethod
163
 
    def verify_signatures_available():
164
 
        """
165
 
        check if this strategy can verify signatures
166
 
 
167
 
        :return: boolean if this strategy can verify signatures
168
 
        """
169
 
        try:
170
 
            import gpgme
171
 
            return True
172
 
        except ImportError, error:
173
 
            return False
174
 
 
 
72
        
175
73
    def _command_line(self):
176
 
        
177
 
        return [self._config.gpg_signing_command(), '--clearsign', '-u',
178
 
                                                self._config.gpg_signing_key()]
 
74
        return [self._config.gpg_signing_command(), '--clearsign']
179
75
 
180
76
    def __init__(self, config):
181
77
        self._config = config
182
 
        try:
183
 
            import gpgme
184
 
            self.context = gpgme.Context()
185
 
        except ImportError, error:
186
 
            pass # can't use verify()
187
78
 
188
79
    def sign(self, content):
189
80
        if isinstance(content, unicode):
220
111
                raise errors.SigningFailed(self._command_line())
221
112
            else:
222
113
                raise
223
 
 
224
 
    def verify(self, content, testament):
225
 
        """Check content has a valid signature.
226
 
        
227
 
        :param content: the commit signature
228
 
        :param testament: the valid testament string for the commit
229
 
        
230
 
        :return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
231
 
        """
232
 
        try:
233
 
            import gpgme
234
 
        except ImportError, error:
235
 
            raise errors.GpgmeNotInstalled(error)
236
 
 
237
 
        signature = StringIO(content)
238
 
        plain_output = StringIO()
239
 
        
240
 
        try:
241
 
            result = self.context.verify(signature, None, plain_output)
242
 
        except gpgme.GpgmeError,error:
243
 
            raise errors.SignatureVerificationFailed(error[2])
244
 
 
245
 
        # No result if input is invalid.
246
 
        # test_verify_invalid()
247
 
        if len(result) == 0:
248
 
            return SIGNATURE_NOT_VALID, None
249
 
        # User has specified a list of acceptable keys, check our result is in it.
250
 
        # test_verify_unacceptable_key()
251
 
        fingerprint = result[0].fpr
252
 
        if self.acceptable_keys is not None:
253
 
            if not fingerprint in self.acceptable_keys:                
254
 
                return SIGNATURE_KEY_MISSING, fingerprint[-8:]
255
 
        # Check the signature actually matches the testament.
256
 
        # test_verify_bad_testament()
257
 
        if testament != plain_output.getvalue():
258
 
            return SIGNATURE_NOT_VALID, None 
259
 
        # Yay gpgme set the valid bit.
260
 
        # Can't write a test for this one as you can't set a key to be
261
 
        # trusted using gpgme.
262
 
        if result[0].summary & gpgme.SIGSUM_VALID:
263
 
            key = self.context.get_key(fingerprint)
264
 
            name = key.uids[0].name
265
 
            email = key.uids[0].email
266
 
            return SIGNATURE_VALID, name + " <" + email + ">"
267
 
        # Sigsum_red indicates a problem, unfortunatly I have not been able
268
 
        # to write any tests which actually set this.
269
 
        if result[0].summary & gpgme.SIGSUM_RED:
270
 
            return SIGNATURE_NOT_VALID, None
271
 
        # GPG does not know this key.
272
 
        # test_verify_unknown_key()
273
 
        if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
274
 
            return SIGNATURE_KEY_MISSING, fingerprint[-8:]
275
 
        # Summary isn't set if sig is valid but key is untrusted
276
 
        # but if user has explicity set the key as acceptable we can validate it.
277
 
        if result[0].summary == 0 and self.acceptable_keys is not None:
278
 
            if fingerprint in self.acceptable_keys:
279
 
                # test_verify_untrusted_but_accepted()
280
 
                return SIGNATURE_VALID, None 
281
 
        # test_verify_valid_but_untrusted()
282
 
        if result[0].summary == 0 and self.acceptable_keys is None:
283
 
            return SIGNATURE_NOT_VALID, None
284
 
        if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED:
285
 
            expires = self.context.get_key(result[0].fpr).subkeys[0].expires
286
 
            if expires > result[0].timestamp:
287
 
                # The expired key was not expired at time of signing.
288
 
                # test_verify_expired_but_valid()
289
 
                return SIGNATURE_EXPIRED, fingerprint[-8:]
290
 
            else:
291
 
                # I can't work out how to create a test where the signature
292
 
                # was expired at the time of signing.
293
 
                return SIGNATURE_NOT_VALID, None
294
 
        # A signature from a revoked key gets this.
295
 
        # test_verify_revoked_signature()
296
 
        if result[0].summary & gpgme.SIGSUM_SYS_ERROR:
297
 
            return SIGNATURE_NOT_VALID, None
298
 
        # Other error types such as revoked keys should (I think) be caught by
299
 
        # SIGSUM_RED so anything else means something is buggy.
300
 
        raise errors.SignatureVerificationFailed("Unknown GnuPG key "\
301
 
                                                 "verification result")
302
 
 
303
 
    def set_acceptable_keys(self, command_line_input):
304
 
        """sets the acceptable keys for verifying with this GPGStrategy
305
 
        
306
 
        :param command_line_input: comma separated list of patterns from
307
 
                                command line
308
 
        :return: nothing
309
 
        """
310
 
        key_patterns = None
311
 
        acceptable_keys_config = self._config.acceptable_keys()
312
 
        try:
313
 
            if isinstance(acceptable_keys_config, unicode):
314
 
                acceptable_keys_config = str(acceptable_keys_config)
315
 
        except UnicodeEncodeError:
316
 
            #gpg Context.keylist(pattern) does not like unicode
317
 
            raise errors.BzrCommandError(gettext('Only ASCII permitted in option names'))
318
 
 
319
 
        if acceptable_keys_config is not None:
320
 
            key_patterns = acceptable_keys_config
321
 
        if command_line_input is not None: #command line overrides config
322
 
            key_patterns = command_line_input
323
 
        if key_patterns is not None:
324
 
            patterns = key_patterns.split(",")
325
 
 
326
 
            self.acceptable_keys = []
327
 
            for pattern in patterns:
328
 
                result = self.context.keylist(pattern)
329
 
                found_key = False
330
 
                for key in result:
331
 
                    found_key = True
332
 
                    self.acceptable_keys.append(key.subkeys[0].fpr)
333
 
                    trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
334
 
                if not found_key:
335
 
                    trace.note(gettext(
336
 
                            "No GnuPG key results for pattern: {0}"
337
 
                                ).format(pattern))
338
 
 
339
 
    def do_verifications(self, revisions, repository,
340
 
                            process_events_callback=None):
341
 
        """do verifications on a set of revisions
342
 
        
343
 
        :param revisions: list of revision ids to verify
344
 
        :param repository: repository object
345
 
        :param process_events_callback: method to call for GUI frontends that
346
 
                                                want to keep their UI refreshed
347
 
        
348
 
        :return: count dictionary of results of each type,
349
 
                 result list for each revision,
350
 
                 boolean True if all results are verified successfully
351
 
        """
352
 
        count = {SIGNATURE_VALID: 0,
353
 
                 SIGNATURE_KEY_MISSING: 0,
354
 
                 SIGNATURE_NOT_VALID: 0,
355
 
                 SIGNATURE_NOT_SIGNED: 0,
356
 
                 SIGNATURE_EXPIRED: 0}
357
 
        result = []
358
 
        all_verifiable = True
359
 
        for rev_id in revisions:
360
 
            verification_result, uid =\
361
 
                                repository.verify_revision(rev_id,self)
362
 
            result.append([rev_id, verification_result, uid])
363
 
            count[verification_result] += 1
364
 
            if verification_result != SIGNATURE_VALID:
365
 
                all_verifiable = False
366
 
            if process_events_callback is not None:
367
 
                process_events_callback()
368
 
        return (count, result, all_verifiable)
369
 
 
370
 
    def verbose_valid_message(self, result):
371
 
        """takes a verify result and returns list of signed commits strings"""
372
 
        signers = {}
373
 
        for rev_id, validity, uid in result:
374
 
            if validity == SIGNATURE_VALID:
375
 
                signers.setdefault(uid, 0)
376
 
                signers[uid] += 1
377
 
        result = []
378
 
        for uid, number in signers.items():
379
 
             result.append( ngettext(u"{0} signed {1} commit", 
380
 
                             u"{0} signed {1} commits",
381
 
                             number).format(uid, number) )
382
 
        return result
383
 
 
384
 
 
385
 
    def verbose_not_valid_message(self, result, repo):
386
 
        """takes a verify result and returns list of not valid commit info"""
387
 
        signers = {}
388
 
        for rev_id, validity, empty in result:
389
 
            if validity == SIGNATURE_NOT_VALID:
390
 
                revision = repo.get_revision(rev_id)
391
 
                authors = ', '.join(revision.get_apparent_authors())
392
 
                signers.setdefault(authors, 0)
393
 
                signers[authors] += 1
394
 
        result = []
395
 
        for authors, number in signers.items():
396
 
            result.append( ngettext(u"{0} commit by author {1}", 
397
 
                                 u"{0} commits by author {1}",
398
 
                                 number).format(number, authors) )
399
 
        return result
400
 
 
401
 
    def verbose_not_signed_message(self, result, repo):
402
 
        """takes a verify result and returns list of not signed commit info"""
403
 
        signers = {}
404
 
        for rev_id, validity, empty in result:
405
 
            if validity == SIGNATURE_NOT_SIGNED:
406
 
                revision = repo.get_revision(rev_id)
407
 
                authors = ', '.join(revision.get_apparent_authors())
408
 
                signers.setdefault(authors, 0)
409
 
                signers[authors] += 1
410
 
        result = []
411
 
        for authors, number in signers.items():
412
 
            result.append( ngettext(u"{0} commit by author {1}", 
413
 
                                 u"{0} commits by author {1}",
414
 
                                 number).format(number, authors) )
415
 
        return result
416
 
 
417
 
    def verbose_missing_key_message(self, result):
418
 
        """takes a verify result and returns list of missing key info"""
419
 
        signers = {}
420
 
        for rev_id, validity, fingerprint in result:
421
 
            if validity == SIGNATURE_KEY_MISSING:
422
 
                signers.setdefault(fingerprint, 0)
423
 
                signers[fingerprint] += 1
424
 
        result = []
425
 
        for fingerprint, number in signers.items():
426
 
            result.append( ngettext(u"Unknown key {0} signed {1} commit", 
427
 
                                 u"Unknown key {0} signed {1} commits",
428
 
                                 number).format(fingerprint, number) )
429
 
        return result
430
 
 
431
 
    def verbose_expired_key_message(self, result, repo):
432
 
        """takes a verify result and returns list of expired key info"""
433
 
        signers = {}
434
 
        fingerprint_to_authors = {}
435
 
        for rev_id, validity, fingerprint in result:
436
 
            if validity == SIGNATURE_EXPIRED:
437
 
                revision = repo.get_revision(rev_id)
438
 
                authors = ', '.join(revision.get_apparent_authors())
439
 
                signers.setdefault(fingerprint, 0)
440
 
                signers[fingerprint] += 1
441
 
                fingerprint_to_authors[fingerprint] = authors
442
 
        result = []
443
 
        for fingerprint, number in signers.items():
444
 
            result.append(ngettext(u"{0} commit by author {1} with "\
445
 
                                    "key {2} now expired", 
446
 
                                   u"{0} commits by author {1} with key {2} now "\
447
 
                                    "expired",
448
 
                                    number).format(number,
449
 
                            fingerprint_to_authors[fingerprint], fingerprint) )
450
 
        return result
451
 
 
452
 
    def valid_commits_message(self, count):
453
 
        """returns message for number of commits"""
454
 
        return gettext(u"{0} commits with valid signatures").format(
455
 
                                        count[SIGNATURE_VALID])
456
 
 
457
 
    def unknown_key_message(self, count):
458
 
        """returns message for number of commits"""
459
 
        return ngettext(u"{0} commit with unknown key",
460
 
                        u"{0} commits with unknown keys",
461
 
                        count[SIGNATURE_KEY_MISSING]).format(
462
 
                                        count[SIGNATURE_KEY_MISSING])
463
 
 
464
 
    def commit_not_valid_message(self, count):
465
 
        """returns message for number of commits"""
466
 
        return ngettext(u"{0} commit not valid",
467
 
                        u"{0} commits not valid",
468
 
                        count[SIGNATURE_NOT_VALID]).format(
469
 
                                            count[SIGNATURE_NOT_VALID])
470
 
 
471
 
    def commit_not_signed_message(self, count):
472
 
        """returns message for number of commits"""
473
 
        return ngettext(u"{0} commit not signed",
474
 
                        u"{0} commits not signed",
475
 
                        count[SIGNATURE_NOT_SIGNED]).format(
476
 
                                        count[SIGNATURE_NOT_SIGNED])
477
 
 
478
 
    def expired_commit_message(self, count):
479
 
        """returns message for number of commits"""
480
 
        return ngettext(u"{0} commit with key now expired",
481
 
                        u"{0} commits with key now expired",
482
 
                        count[SIGNATURE_EXPIRED]).format(
483
 
                                    count[SIGNATURE_EXPIRED])