~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/gpg.py

  • Committer: Ian Clatworthy
  • Date: 2007-12-11 02:07:30 UTC
  • mto: (3119.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 3120.
  • Revision ID: ian.clatworthy@internode.on.net-20071211020730-sdj4kj794dw0628e
make help topics more discoverable

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2011 Canonical Ltd
 
1
# Copyright (C) 2005 Canonical Ltd
2
2
#   Authors: Robert Collins <robert.collins@canonical.com>
3
3
#
4
4
# This program is free software; you can redistribute it and/or modify
13
13
#
14
14
# You should have received a copy of the GNU General Public License
15
15
# along with this program; if not, write to the Free Software
16
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
17
 
18
18
"""GPG signing and checking logic."""
19
19
 
20
20
import os
21
21
import sys
22
 
from StringIO import StringIO
23
22
 
24
23
from bzrlib.lazy_import import lazy_import
25
24
lazy_import(globals(), """
33
32
    )
34
33
""")
35
34
 
36
 
class i18n:
37
 
    """this class is ready to use bzrlib.i18n but bzrlib.i18n is not ready to
38
 
    use so here is a stub until it is"""
39
 
    @staticmethod
40
 
    def gettext(string):
41
 
        return string
42
 
        
43
 
    @staticmethod
44
 
    def ngettext(single, plural, number):
45
 
        if number == 1:
46
 
            return single
47
 
        else:
48
 
            return plural
49
 
 
50
 
#verification results
51
 
SIGNATURE_VALID = 0
52
 
SIGNATURE_KEY_MISSING = 1
53
 
SIGNATURE_NOT_VALID = 2
54
 
SIGNATURE_NOT_SIGNED = 3
55
 
 
56
35
 
57
36
class DisabledGPGStrategy(object):
58
37
    """A GPG Strategy that makes everything fail."""
59
38
 
60
 
    @staticmethod
61
 
    def verify_signatures_available():
62
 
        return True
63
 
 
64
39
    def __init__(self, ignored):
65
40
        """Real strategies take a configuration."""
66
41
 
67
42
    def sign(self, content):
68
43
        raise errors.SigningFailed('Signing is disabled.')
69
44
 
70
 
    def verify(self, content, testament):
71
 
        raise errors.SignatureVerificationFailed('Signature verification is \
72
 
disabled.')
73
 
 
74
 
    def set_acceptable_keys(self, command_line_input):
75
 
        pass
76
 
 
77
45
 
78
46
class LoopbackGPGStrategy(object):
79
47
    """A GPG Strategy that acts like 'cat' - data is just passed through."""
80
48
 
81
 
    @staticmethod
82
 
    def verify_signatures_available():
83
 
        return True
84
 
 
85
49
    def __init__(self, ignored):
86
50
        """Real strategies take a configuration."""
87
51
 
89
53
        return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
90
54
                "-----END PSEUDO-SIGNED CONTENT-----\n")
91
55
 
92
 
    def verify(self, content, testament):
93
 
        return SIGNATURE_VALID, None
94
 
 
95
 
    def set_acceptable_keys(self, command_line_input):
96
 
        if command_line_input is not None:
97
 
            patterns = command_line_input.split(",")
98
 
            self.acceptable_keys = []
99
 
            for pattern in patterns:
100
 
                if pattern == "unknown":
101
 
                    pass
102
 
                else:
103
 
                    self.acceptable_keys.append(pattern)
104
 
 
105
 
    def do_verifications(self, revisions, repository):
106
 
        count = {SIGNATURE_VALID: 0,
107
 
                 SIGNATURE_KEY_MISSING: 0,
108
 
                 SIGNATURE_NOT_VALID: 0,
109
 
                 SIGNATURE_NOT_SIGNED: 0}
110
 
        result = []
111
 
        all_verifiable = True
112
 
        for rev_id in revisions:
113
 
            verification_result, uid =\
114
 
                                repository.verify_revision(rev_id,self)
115
 
            result.append([rev_id, verification_result, uid])
116
 
            count[verification_result] += 1
117
 
            if verification_result != SIGNATURE_VALID:
118
 
                all_verifiable = False
119
 
        return (count, result, all_verifiable)
120
 
 
121
 
    def valid_commits_message(self, count):
122
 
        return i18n.gettext("{0} commits with valid signatures").format(
123
 
                                        count[SIGNATURE_VALID])            
124
 
 
125
 
    def unknown_key_message(self, count):
126
 
        return i18n.ngettext("{0} commit with unknown key",
127
 
                             "{0} commits with unknown keys",
128
 
                             count[SIGNATURE_KEY_MISSING]).format(
129
 
                                        count[SIGNATURE_KEY_MISSING])
130
 
 
131
 
    def commit_not_valid_message(self, count):
132
 
        return i18n.ngettext("{0} commit not valid",
133
 
                             "{0} commits not valid",
134
 
                             count[SIGNATURE_NOT_VALID]).format(
135
 
                                            count[SIGNATURE_NOT_VALID])
136
 
 
137
 
    def commit_not_signed_message(self, count):
138
 
        return i18n.ngettext("{0} commit not signed",
139
 
                             "{0} commits not signed",
140
 
                             count[SIGNATURE_NOT_SIGNED]).format(
141
 
                                        count[SIGNATURE_NOT_SIGNED])
142
 
 
143
56
 
144
57
def _set_gpg_tty():
145
58
    tty = os.environ.get('TTY')
156
69
 
157
70
class GPGStrategy(object):
158
71
    """GPG Signing and checking facilities."""
159
 
 
160
 
    acceptable_keys = None
161
 
 
162
 
    @staticmethod
163
 
    def verify_signatures_available():
164
 
        try:
165
 
            import gpgme
166
 
            return True
167
 
        except ImportError, error:
168
 
            return False
169
 
 
 
72
        
170
73
    def _command_line(self):
171
74
        return [self._config.gpg_signing_command(), '--clearsign']
172
75
 
173
76
    def __init__(self, config):
174
77
        self._config = config
175
 
        try:
176
 
            import gpgme
177
 
            self.context = gpgme.Context()
178
 
        except ImportError, error:
179
 
            pass # can't use verify()
180
78
 
181
79
    def sign(self, content):
182
80
        if isinstance(content, unicode):
213
111
                raise errors.SigningFailed(self._command_line())
214
112
            else:
215
113
                raise
216
 
 
217
 
    def verify(self, content, testament):
218
 
        """Check content has a valid signature.
219
 
        
220
 
        :param content: the commit signature
221
 
        :param testament: the valid testament string for the commit
222
 
        
223
 
        :return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
224
 
        """
225
 
        try:
226
 
            import gpgme
227
 
        except ImportError, error:
228
 
            raise errors.GpgmeNotInstalled(error)
229
 
 
230
 
        signature = StringIO(content)
231
 
        plain_output = StringIO()
232
 
        
233
 
        try:
234
 
            result = self.context.verify(signature, None, plain_output)
235
 
        except gpgme.GpgmeError,error:
236
 
            raise errors.SignatureVerificationFailed(error[2])
237
 
 
238
 
        if len(result) == 0:
239
 
            return SIGNATURE_NOT_VALID, None
240
 
        fingerprint = result[0].fpr
241
 
        if self.acceptable_keys is not None:
242
 
            if not fingerprint in self.acceptable_keys:
243
 
                return SIGNATURE_KEY_MISSING, fingerprint[-8:]
244
 
        if testament != plain_output.getvalue():
245
 
            return SIGNATURE_NOT_VALID, None
246
 
        if result[0].summary & gpgme.SIGSUM_VALID:
247
 
            key = self.context.get_key(fingerprint)
248
 
            name = key.uids[0].name
249
 
            email = key.uids[0].email
250
 
            return SIGNATURE_VALID, name + " <" + email + ">"
251
 
        if result[0].summary & gpgme.SIGSUM_RED:
252
 
            return SIGNATURE_NOT_VALID, None
253
 
        if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
254
 
            return SIGNATURE_KEY_MISSING, fingerprint[-8:]
255
 
        #summary isn't set if sig is valid but key is untrusted
256
 
        if result[0].summary == 0 and self.acceptable_keys is not None:
257
 
            if fingerprint in self.acceptable_keys:
258
 
                return SIGNATURE_VALID, None
259
 
        else:
260
 
            return SIGNATURE_KEY_MISSING, None
261
 
        raise errors.SignatureVerificationFailed("Unknown GnuPG key "\
262
 
                                                 "verification result")
263
 
 
264
 
    def set_acceptable_keys(self, command_line_input):
265
 
        """sets the acceptable keys for verifying with this GPGStrategy
266
 
        
267
 
        :param command_line_input: comma separated list of patterns from
268
 
                                command line
269
 
        :return: nothing
270
 
        """
271
 
        key_patterns = None
272
 
        acceptable_keys_config = self._config.acceptable_keys()
273
 
        try:
274
 
            if isinstance(acceptable_keys_config, unicode):
275
 
                acceptable_keys_config = str(acceptable_keys_config)
276
 
        except UnicodeEncodeError:
277
 
            raise errors.BzrCommandError('Only ASCII permitted in option names')
278
 
 
279
 
        if acceptable_keys_config is not None:
280
 
            key_patterns = acceptable_keys_config
281
 
        if command_line_input is not None: #command line overrides config
282
 
            key_patterns = command_line_input
283
 
        if key_patterns is not None:
284
 
            patterns = key_patterns.split(",")
285
 
 
286
 
            self.acceptable_keys = []
287
 
            for pattern in patterns:
288
 
                result = self.context.keylist(pattern)
289
 
                found_key = False
290
 
                for key in result:
291
 
                    found_key = True
292
 
                    self.acceptable_keys.append(key.subkeys[0].fpr)
293
 
                    trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
294
 
                if not found_key:
295
 
                    trace.note(i18n.gettext(
296
 
                            "No GnuPG key results for pattern: {}"
297
 
                                ).format(pattern))
298
 
 
299
 
    def do_verifications(self, revisions, repository):
300
 
        """do verifications on a set of revisions
301
 
        
302
 
        :param revisions: list of revision ids to verify
303
 
        :param repository: repository object
304
 
        
305
 
        :return: count dictionary of results of each type,
306
 
                 result list for each revision,
307
 
                 boolean True if all results are verified successfully
308
 
        """
309
 
        count = {SIGNATURE_VALID: 0,
310
 
                 SIGNATURE_KEY_MISSING: 0,
311
 
                 SIGNATURE_NOT_VALID: 0,
312
 
                 SIGNATURE_NOT_SIGNED: 0}
313
 
        result = []
314
 
        all_verifiable = True
315
 
        for rev_id in revisions:
316
 
            verification_result, uid =\
317
 
                                repository.verify_revision(rev_id,self)
318
 
            result.append([rev_id, verification_result, uid])
319
 
            count[verification_result] += 1
320
 
            if verification_result != SIGNATURE_VALID:
321
 
                all_verifiable = False
322
 
        return (count, result, all_verifiable)
323
 
 
324
 
    def verbose_valid_message(self, result):
325
 
        """takes a verify result and returns list of signed commits strings"""
326
 
        signers = {}
327
 
        for rev_id, validity, uid in result:
328
 
            if validity == SIGNATURE_VALID:
329
 
                signers.setdefault(uid, 0)
330
 
                signers[uid] += 1
331
 
        result = []
332
 
        for uid, number in signers.items():
333
 
             result.append( i18n.ngettext("{0} signed {1} commit", 
334
 
                             "{0} signed {1} commits",
335
 
                             number).format(uid, number) )
336
 
        return result
337
 
 
338
 
 
339
 
    def verbose_not_valid_message(self, result, repo):
340
 
        """takes a verify result and returns list of not valid commit info"""
341
 
        signers = {}
342
 
        for rev_id, validity, empty in result:
343
 
            if validity == SIGNATURE_NOT_VALID:
344
 
                revision = repo.get_revision(rev_id)
345
 
                authors = ', '.join(revision.get_apparent_authors())
346
 
                signers.setdefault(authors, 0)
347
 
                signers[authors] += 1
348
 
        result = []
349
 
        for authors, number in signers.items():
350
 
            result.append( i18n.ngettext("{0} commit by author {1}", 
351
 
                                 "{0} commits by author {1}",
352
 
                                 number).format(number, authors) )
353
 
        return result
354
 
 
355
 
    def verbose_not_signed_message(self, result, repo):
356
 
        """takes a verify result and returns list of not signed commit info"""
357
 
        signers = {}
358
 
        for rev_id, validity, empty in result:
359
 
            if validity == SIGNATURE_NOT_SIGNED:
360
 
                revision = repo.get_revision(rev_id)
361
 
                authors = ', '.join(revision.get_apparent_authors())
362
 
                signers.setdefault(authors, 0)
363
 
                signers[authors] += 1
364
 
        result = []
365
 
        for authors, number in signers.items():
366
 
            result.append( i18n.ngettext("{0} commit by author {1}", 
367
 
                                 "{0} commits by author {1}",
368
 
                                 number).format(number, authors) )
369
 
        return result
370
 
 
371
 
    def verbose_missing_key_message(self, result):
372
 
        """takes a verify result and returns list of missing key info"""
373
 
        signers = {}
374
 
        for rev_id, validity, fingerprint in result:
375
 
            if validity == SIGNATURE_KEY_MISSING:
376
 
                signers.setdefault(fingerprint, 0)
377
 
                signers[fingerprint] += 1
378
 
        result = []
379
 
        for fingerprint, number in signers.items():
380
 
            result.append( i18n.ngettext("Unknown key {0} signed {1} commit", 
381
 
                                 "Unknown key {0} signed {1} commits",
382
 
                                 number).format(fingerprint, number) )
383
 
        return result
384
 
 
385
 
    def valid_commits_message(self, count):
386
 
        """returns message for number of commits"""
387
 
        return i18n.gettext("{0} commits with valid signatures").format(
388
 
                                        count[SIGNATURE_VALID])
389
 
 
390
 
    def unknown_key_message(self, count):
391
 
        """returns message for number of commits"""
392
 
        return i18n.ngettext("{0} commit with unknown key",
393
 
                             "{0} commits with unknown keys",
394
 
                             count[SIGNATURE_KEY_MISSING]).format(
395
 
                                        count[SIGNATURE_KEY_MISSING])
396
 
 
397
 
    def commit_not_valid_message(self, count):
398
 
        """returns message for number of commits"""
399
 
        return i18n.ngettext("{0} commit not valid",
400
 
                             "{0} commits not valid",
401
 
                             count[SIGNATURE_NOT_VALID]).format(
402
 
                                            count[SIGNATURE_NOT_VALID])
403
 
 
404
 
    def commit_not_signed_message(self, count):
405
 
        """returns message for number of commits"""
406
 
        return i18n.ngettext("{0} commit not signed",
407
 
                             "{0} commits not signed",
408
 
                             count[SIGNATURE_NOT_SIGNED]).format(
409
 
                                        count[SIGNATURE_NOT_SIGNED])