1
# Copyright (C) 2005, 2011 Canonical Ltd
2
# Authors: Robert Collins <robert.collins@canonical.com>
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
# GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
"""GPG signing and checking logic."""
22
from StringIO import StringIO
24
from bzrlib.lazy_import import lazy_import
25
lazy_import(globals(), """
34
from bzrlib.i18n import (
42
SIGNATURE_KEY_MISSING = 1
43
SIGNATURE_NOT_VALID = 2
44
SIGNATURE_NOT_SIGNED = 3
48
class DisabledGPGStrategy(object):
49
"""A GPG Strategy that makes everything fail."""
52
def verify_signatures_available():
55
def __init__(self, ignored):
56
"""Real strategies take a configuration."""
58
def sign(self, content):
59
raise errors.SigningFailed('Signing is disabled.')
61
def verify(self, content, testament):
62
raise errors.SignatureVerificationFailed('Signature verification is \
65
def set_acceptable_keys(self, command_line_input):
69
class LoopbackGPGStrategy(object):
70
"""A GPG Strategy that acts like 'cat' - data is just passed through.
75
def verify_signatures_available():
78
def __init__(self, ignored):
79
"""Real strategies take a configuration."""
81
def sign(self, content):
82
return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content +
83
"-----END PSEUDO-SIGNED CONTENT-----\n")
85
def verify(self, content, testament):
86
return SIGNATURE_VALID, None
88
def set_acceptable_keys(self, command_line_input):
89
if command_line_input is not None:
90
patterns = command_line_input.split(",")
91
self.acceptable_keys = []
92
for pattern in patterns:
93
if pattern == "unknown":
96
self.acceptable_keys.append(pattern)
98
def do_verifications(self, revisions, repository):
99
count = {SIGNATURE_VALID: 0,
100
SIGNATURE_KEY_MISSING: 0,
101
SIGNATURE_NOT_VALID: 0,
102
SIGNATURE_NOT_SIGNED: 0,
103
SIGNATURE_EXPIRED: 0}
105
all_verifiable = True
106
for rev_id in revisions:
107
verification_result, uid =\
108
repository.verify_revision(rev_id,self)
109
result.append([rev_id, verification_result, uid])
110
count[verification_result] += 1
111
if verification_result != SIGNATURE_VALID:
112
all_verifiable = False
113
return (count, result, all_verifiable)
115
def valid_commits_message(self, count):
116
return gettext(u"{0} commits with valid signatures").format(
117
count[SIGNATURE_VALID])
119
def unknown_key_message(self, count):
120
return ngettext(u"{0} commit with unknown key",
121
u"{0} commits with unknown keys",
122
count[SIGNATURE_KEY_MISSING]).format(
123
count[SIGNATURE_KEY_MISSING])
125
def commit_not_valid_message(self, count):
126
return ngettext(u"{0} commit not valid",
127
u"{0} commits not valid",
128
count[SIGNATURE_NOT_VALID]).format(
129
count[SIGNATURE_NOT_VALID])
131
def commit_not_signed_message(self, count):
132
return ngettext(u"{0} commit not signed",
133
u"{0} commits not signed",
134
count[SIGNATURE_NOT_SIGNED]).format(
135
count[SIGNATURE_NOT_SIGNED])
137
def expired_commit_message(self, count):
138
return ngettext(u"{0} commit with key now expired",
139
u"{0} commits with key now expired",
140
count[SIGNATURE_EXPIRED]).format(
141
count[SIGNATURE_EXPIRED])
145
tty = os.environ.get('TTY')
147
os.environ['GPG_TTY'] = tty
148
trace.mutter('setting GPG_TTY=%s', tty)
150
# This is not quite worthy of a warning, because some people
151
# don't need GPG_TTY to be set. But it is worthy of a big mark
152
# in ~/.bzr.log, so that people can debug it if it happens to them
153
trace.mutter('** Env var TTY empty, cannot set GPG_TTY.'
157
class GPGStrategy(object):
158
"""GPG Signing and checking facilities."""
160
acceptable_keys = None
163
def verify_signatures_available():
165
check if this strategy can verify signatures
167
:return: boolean if this strategy can verify signatures
172
except ImportError, error:
175
def _command_line(self):
177
return [self._config.gpg_signing_command(), '--clearsign', '-u',
178
self._config.gpg_signing_key()]
180
def __init__(self, config):
181
self._config = config
184
self.context = gpgme.Context()
185
except ImportError, error:
186
pass # can't use verify()
188
def sign(self, content):
189
if isinstance(content, unicode):
190
raise errors.BzrBadParameterUnicode('content')
191
ui.ui_factory.clear_term()
193
preexec_fn = _set_gpg_tty
194
if sys.platform == 'win32':
195
# Win32 doesn't support preexec_fn, but wouldn't support TTY anyway.
198
process = subprocess.Popen(self._command_line(),
199
stdin=subprocess.PIPE,
200
stdout=subprocess.PIPE,
201
preexec_fn=preexec_fn)
203
result = process.communicate(content)[0]
204
if process.returncode is None:
206
if process.returncode != 0:
207
raise errors.SigningFailed(self._command_line())
210
if e.errno == errno.EPIPE:
211
raise errors.SigningFailed(self._command_line())
215
# bad subprocess parameters, should never happen.
218
if e.errno == errno.ENOENT:
219
# gpg is not installed
220
raise errors.SigningFailed(self._command_line())
224
def verify(self, content, testament):
225
"""Check content has a valid signature.
227
:param content: the commit signature
228
:param testament: the valid testament string for the commit
230
:return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid
234
except ImportError, error:
235
raise errors.GpgmeNotInstalled(error)
237
signature = StringIO(content)
238
plain_output = StringIO()
241
result = self.context.verify(signature, None, plain_output)
242
except gpgme.GpgmeError,error:
243
raise errors.SignatureVerificationFailed(error[2])
245
# No result if input is invalid.
246
# test_verify_invalid()
248
return SIGNATURE_NOT_VALID, None
249
# User has specified a list of acceptable keys, check our result is in it.
250
# test_verify_unacceptable_key()
251
fingerprint = result[0].fpr
252
if self.acceptable_keys is not None:
253
if not fingerprint in self.acceptable_keys:
254
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
255
# Check the signature actually matches the testament.
256
# test_verify_bad_testament()
257
if testament != plain_output.getvalue():
258
return SIGNATURE_NOT_VALID, None
259
# Yay gpgme set the valid bit.
260
# Can't write a test for this one as you can't set a key to be
261
# trusted using gpgme.
262
if result[0].summary & gpgme.SIGSUM_VALID:
263
key = self.context.get_key(fingerprint)
264
name = key.uids[0].name
265
email = key.uids[0].email
266
return SIGNATURE_VALID, name + " <" + email + ">"
267
# Sigsum_red indicates a problem, unfortunatly I have not been able
268
# to write any tests which actually set this.
269
if result[0].summary & gpgme.SIGSUM_RED:
270
return SIGNATURE_NOT_VALID, None
271
# GPG does not know this key.
272
# test_verify_unknown_key()
273
if result[0].summary & gpgme.SIGSUM_KEY_MISSING:
274
return SIGNATURE_KEY_MISSING, fingerprint[-8:]
275
# Summary isn't set if sig is valid but key is untrusted
276
# but if user has explicity set the key as acceptable we can validate it.
277
if result[0].summary == 0 and self.acceptable_keys is not None:
278
if fingerprint in self.acceptable_keys:
279
# test_verify_untrusted_but_accepted()
280
return SIGNATURE_VALID, None
281
# test_verify_valid_but_untrusted()
282
if result[0].summary == 0 and self.acceptable_keys is None:
283
return SIGNATURE_NOT_VALID, None
284
if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED:
285
expires = self.context.get_key(result[0].fpr).subkeys[0].expires
286
if expires > result[0].timestamp:
287
# The expired key was not expired at time of signing.
288
# test_verify_expired_but_valid()
289
return SIGNATURE_EXPIRED, fingerprint[-8:]
291
# I can't work out how to create a test where the signature
292
# was expired at the time of signing.
293
return SIGNATURE_NOT_VALID, None
294
# A signature from a revoked key gets this.
295
# test_verify_revoked_signature()
296
if result[0].summary & gpgme.SIGSUM_SYS_ERROR:
297
return SIGNATURE_NOT_VALID, None
298
# Other error types such as revoked keys should (I think) be caught by
299
# SIGSUM_RED so anything else means something is buggy.
300
raise errors.SignatureVerificationFailed("Unknown GnuPG key "\
301
"verification result")
303
def set_acceptable_keys(self, command_line_input):
304
"""sets the acceptable keys for verifying with this GPGStrategy
306
:param command_line_input: comma separated list of patterns from
311
acceptable_keys_config = self._config.acceptable_keys()
313
if isinstance(acceptable_keys_config, unicode):
314
acceptable_keys_config = str(acceptable_keys_config)
315
except UnicodeEncodeError:
316
#gpg Context.keylist(pattern) does not like unicode
317
raise errors.BzrCommandError('Only ASCII permitted in option names')
319
if acceptable_keys_config is not None:
320
key_patterns = acceptable_keys_config
321
if command_line_input is not None: #command line overrides config
322
key_patterns = command_line_input
323
if key_patterns is not None:
324
patterns = key_patterns.split(",")
326
self.acceptable_keys = []
327
for pattern in patterns:
328
result = self.context.keylist(pattern)
332
self.acceptable_keys.append(key.subkeys[0].fpr)
333
trace.mutter("Added acceptable key: " + key.subkeys[0].fpr)
336
"No GnuPG key results for pattern: {0}"
339
def do_verifications(self, revisions, repository,
340
process_events_callback=None):
341
"""do verifications on a set of revisions
343
:param revisions: list of revision ids to verify
344
:param repository: repository object
345
:param process_events_callback: method to call for GUI frontends that
346
want to keep their UI refreshed
348
:return: count dictionary of results of each type,
349
result list for each revision,
350
boolean True if all results are verified successfully
352
count = {SIGNATURE_VALID: 0,
353
SIGNATURE_KEY_MISSING: 0,
354
SIGNATURE_NOT_VALID: 0,
355
SIGNATURE_NOT_SIGNED: 0,
356
SIGNATURE_EXPIRED: 0}
358
all_verifiable = True
359
for rev_id in revisions:
360
verification_result, uid =\
361
repository.verify_revision(rev_id,self)
362
result.append([rev_id, verification_result, uid])
363
count[verification_result] += 1
364
if verification_result != SIGNATURE_VALID:
365
all_verifiable = False
366
if process_events_callback is not None:
367
process_events_callback()
368
return (count, result, all_verifiable)
370
def verbose_valid_message(self, result):
371
"""takes a verify result and returns list of signed commits strings"""
373
for rev_id, validity, uid in result:
374
if validity == SIGNATURE_VALID:
375
signers.setdefault(uid, 0)
378
for uid, number in signers.items():
379
result.append( ngettext(u"{0} signed {1} commit",
380
u"{0} signed {1} commits",
381
number).format(uid, number) )
385
def verbose_not_valid_message(self, result, repo):
386
"""takes a verify result and returns list of not valid commit info"""
388
for rev_id, validity, empty in result:
389
if validity == SIGNATURE_NOT_VALID:
390
revision = repo.get_revision(rev_id)
391
authors = ', '.join(revision.get_apparent_authors())
392
signers.setdefault(authors, 0)
393
signers[authors] += 1
395
for authors, number in signers.items():
396
result.append( ngettext(u"{0} commit by author {1}",
397
u"{0} commits by author {1}",
398
number).format(number, authors) )
401
def verbose_not_signed_message(self, result, repo):
402
"""takes a verify result and returns list of not signed commit info"""
404
for rev_id, validity, empty in result:
405
if validity == SIGNATURE_NOT_SIGNED:
406
revision = repo.get_revision(rev_id)
407
authors = ', '.join(revision.get_apparent_authors())
408
signers.setdefault(authors, 0)
409
signers[authors] += 1
411
for authors, number in signers.items():
412
result.append( ngettext(u"{0} commit by author {1}",
413
u"{0} commits by author {1}",
414
number).format(number, authors) )
417
def verbose_missing_key_message(self, result):
418
"""takes a verify result and returns list of missing key info"""
420
for rev_id, validity, fingerprint in result:
421
if validity == SIGNATURE_KEY_MISSING:
422
signers.setdefault(fingerprint, 0)
423
signers[fingerprint] += 1
425
for fingerprint, number in signers.items():
426
result.append( ngettext(u"Unknown key {0} signed {1} commit",
427
u"Unknown key {0} signed {1} commits",
428
number).format(fingerprint, number) )
431
def verbose_expired_key_message(self, result, repo):
432
"""takes a verify result and returns list of expired key info"""
434
fingerprint_to_authors = {}
435
for rev_id, validity, fingerprint in result:
436
if validity == SIGNATURE_EXPIRED:
437
revision = repo.get_revision(rev_id)
438
authors = ', '.join(revision.get_apparent_authors())
439
signers.setdefault(fingerprint, 0)
440
signers[fingerprint] += 1
441
fingerprint_to_authors[fingerprint] = authors
443
for fingerprint, number in signers.items():
444
result.append(ngettext(u"{0} commit by author {1} with "\
445
"key {2} now expired",
446
u"{0} commits by author {1} with key {2} now "\
448
number).format(number,
449
fingerprint_to_authors[fingerprint], fingerprint) )
452
def valid_commits_message(self, count):
453
"""returns message for number of commits"""
454
return gettext(u"{0} commits with valid signatures").format(
455
count[SIGNATURE_VALID])
457
def unknown_key_message(self, count):
458
"""returns message for number of commits"""
459
return ngettext(u"{0} commit with unknown key",
460
u"{0} commits with unknown keys",
461
count[SIGNATURE_KEY_MISSING]).format(
462
count[SIGNATURE_KEY_MISSING])
464
def commit_not_valid_message(self, count):
465
"""returns message for number of commits"""
466
return ngettext(u"{0} commit not valid",
467
u"{0} commits not valid",
468
count[SIGNATURE_NOT_VALID]).format(
469
count[SIGNATURE_NOT_VALID])
471
def commit_not_signed_message(self, count):
472
"""returns message for number of commits"""
473
return ngettext(u"{0} commit not signed",
474
u"{0} commits not signed",
475
count[SIGNATURE_NOT_SIGNED]).format(
476
count[SIGNATURE_NOT_SIGNED])
478
def expired_commit_message(self, count):
479
"""returns message for number of commits"""
480
return ngettext(u"{0} commit with key now expired",
481
u"{0} commits with key now expired",
482
count[SIGNATURE_EXPIRED]).format(
483
count[SIGNATURE_EXPIRED])