~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smtp_connection.py

  • Committer: Martin Packman
  • Date: 2011-12-23 19:38:22 UTC
  • mto: This revision was merged to the branch mainline in revision 6405.
  • Revision ID: martin.packman@canonical.com-20111223193822-hesheea4o8aqwexv
Accept and document passing the medium rather than transport for smart connections

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2007 Canonical Ltd
 
1
# Copyright (C) 2007, 2009, 2010, 2011 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
from cStringIO import StringIO
18
17
from email.Message import Message
19
 
 
20
 
from bzrlib import config
21
 
from bzrlib.errors import NoDestinationAddress
22
 
from bzrlib.tests import TestCase
23
 
from bzrlib.smtp_connection import SMTPConnection
24
 
 
25
 
 
26
 
class TestSMTPConnection(TestCase):
27
 
 
28
 
    def get_connection(self, text):
29
 
        my_config = config.GlobalConfig()
30
 
        config_file = StringIO(text)
31
 
        my_config._get_parser(config_file)
32
 
        return SMTPConnection(my_config)
 
18
import errno
 
19
import smtplib
 
20
import socket
 
21
 
 
22
from bzrlib import (
 
23
    config,
 
24
    email_message,
 
25
    errors,
 
26
    smtp_connection,
 
27
    tests,
 
28
    ui,
 
29
    )
 
30
 
 
31
 
 
32
def connection_refuser():
 
33
    def connect(server):
 
34
        raise socket.error(errno.ECONNREFUSED, 'Connection Refused')
 
35
    smtp = smtplib.SMTP()
 
36
    smtp.connect = connect
 
37
    return smtp
 
38
 
 
39
 
 
40
class StubSMTPFactory(object):
 
41
    """A fake SMTP connection to test the connection setup."""
 
42
    def __init__(self, fail_on=None, smtp_features=None):
 
43
        self._fail_on = fail_on or []
 
44
        self._calls = []
 
45
        self._smtp_features = smtp_features or []
 
46
        self._ehlo_called = False
 
47
 
 
48
    def __call__(self):
 
49
        # The factory pretends to be a connection
 
50
        return self
 
51
 
 
52
    def connect(self, server):
 
53
        self._calls.append(('connect', server))
 
54
 
 
55
    def helo(self):
 
56
        self._calls.append(('helo',))
 
57
        if 'helo' in self._fail_on:
 
58
            return 500, 'helo failure'
 
59
        else:
 
60
            return 200, 'helo success'
 
61
 
 
62
    def ehlo(self):
 
63
        self._calls.append(('ehlo',))
 
64
        if 'ehlo' in self._fail_on:
 
65
            return 500, 'ehlo failure'
 
66
        else:
 
67
            self._ehlo_called = True
 
68
            return 200, 'ehlo success'
 
69
 
 
70
    def has_extn(self, extension):
 
71
        self._calls.append(('has_extn', extension))
 
72
        return self._ehlo_called and extension in self._smtp_features
 
73
 
 
74
    def starttls(self):
 
75
        self._calls.append(('starttls',))
 
76
        if 'starttls' in self._fail_on:
 
77
            return 500, 'starttls failure'
 
78
        else:
 
79
            self._ehlo_called = True
 
80
            return 200, 'starttls success'
 
81
 
 
82
 
 
83
class WideOpenSMTPFactory(StubSMTPFactory):
 
84
    """A fake smtp server that implements login by accepting anybody."""
 
85
 
 
86
    def login(self, user, password):
 
87
        self._calls.append(('login', user, password))
 
88
 
 
89
 
 
90
class TestSMTPConnection(tests.TestCaseInTempDir):
 
91
 
 
92
    def get_connection(self, text, smtp_factory=None):
 
93
        my_config = config.GlobalConfig.from_string(text)
 
94
        return smtp_connection.SMTPConnection(my_config,
 
95
                                              _smtp_factory=smtp_factory)
33
96
 
34
97
    def test_defaults(self):
35
98
        conn = self.get_connection('')
41
104
        conn = self.get_connection('[DEFAULT]\nsmtp_server=host:10\n')
42
105
        self.assertEqual('host:10', conn._smtp_server)
43
106
 
 
107
    def test_missing_server(self):
 
108
        conn = self.get_connection('', smtp_factory=connection_refuser)
 
109
        self.assertRaises(errors.DefaultSMTPConnectionRefused, conn._connect)
 
110
        conn = self.get_connection('[DEFAULT]\nsmtp_server=smtp.example.com\n',
 
111
                                   smtp_factory=connection_refuser)
 
112
        self.assertRaises(errors.SMTPConnectionRefused, conn._connect)
 
113
 
44
114
    def test_smtp_username(self):
45
115
        conn = self.get_connection('')
46
116
        self.assertIs(None, conn._smtp_username)
48
118
        conn = self.get_connection('[DEFAULT]\nsmtp_username=joebody\n')
49
119
        self.assertEqual(u'joebody', conn._smtp_username)
50
120
 
51
 
    def test_smtp_password(self):
 
121
    def test_smtp_password_from_config(self):
52
122
        conn = self.get_connection('')
53
123
        self.assertIs(None, conn._smtp_password)
54
124
 
55
125
        conn = self.get_connection('[DEFAULT]\nsmtp_password=mypass\n')
56
126
        self.assertEqual(u'mypass', conn._smtp_password)
57
127
 
 
128
    def test_smtp_password_from_user(self):
 
129
        user = 'joe'
 
130
        password = 'hispass'
 
131
        factory = WideOpenSMTPFactory()
 
132
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
133
                                   smtp_factory=factory)
 
134
        self.assertIs(None, conn._smtp_password)
 
135
 
 
136
        ui.ui_factory = ui.CannedInputUIFactory([password])
 
137
        conn._connect()
 
138
        self.assertEqual(password, conn._smtp_password)
 
139
 
 
140
    def test_smtp_password_from_auth_config(self):
 
141
        user = 'joe'
 
142
        password = 'hispass'
 
143
        factory = WideOpenSMTPFactory()
 
144
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
145
                                   smtp_factory=factory)
 
146
        self.assertEqual(user, conn._smtp_username)
 
147
        self.assertIs(None, conn._smtp_password)
 
148
        # Create a config file with the right password
 
149
        conf = config.AuthenticationConfig()
 
150
        conf._get_config().update({'smtptest':
 
151
                                       {'scheme': 'smtp', 'user':user,
 
152
                                        'password': password}})
 
153
        conf._save()
 
154
 
 
155
        conn._connect()
 
156
        self.assertEqual(password, conn._smtp_password)
 
157
 
 
158
    def test_authenticate_with_byte_strings(self):
 
159
        user = 'joe'
 
160
        unicode_pass = u'h\xECspass'
 
161
        utf8_pass = unicode_pass.encode('utf-8')
 
162
        factory = WideOpenSMTPFactory()
 
163
        conn = self.get_connection(
 
164
            u'[DEFAULT]\nsmtp_username=%s\nsmtp_password=%s\n'
 
165
            % (user, unicode_pass), smtp_factory=factory)
 
166
        self.assertEqual(unicode_pass, conn._smtp_password)
 
167
        conn._connect()
 
168
        self.assertEqual([('connect', 'localhost'),
 
169
                          ('ehlo',),
 
170
                          ('has_extn', 'starttls'),
 
171
                          ('login', user, utf8_pass)], factory._calls)
 
172
        smtp_username, smtp_password = factory._calls[-1][1:]
 
173
        self.assertIsInstance(smtp_username, str)
 
174
        self.assertIsInstance(smtp_password, str)
 
175
 
 
176
    def test_create_connection(self):
 
177
        factory = StubSMTPFactory()
 
178
        conn = self.get_connection('', smtp_factory=factory)
 
179
        conn._create_connection()
 
180
        self.assertEqual([('connect', 'localhost'),
 
181
                          ('ehlo',),
 
182
                          ('has_extn', 'starttls')], factory._calls)
 
183
 
 
184
    def test_create_connection_ehlo_fails(self):
 
185
        # Check that we call HELO if EHLO failed.
 
186
        factory = StubSMTPFactory(fail_on=['ehlo'])
 
187
        conn = self.get_connection('', smtp_factory=factory)
 
188
        conn._create_connection()
 
189
        self.assertEqual([('connect', 'localhost'),
 
190
                          ('ehlo',),
 
191
                          ('helo',),
 
192
                          ('has_extn', 'starttls')], factory._calls)
 
193
 
 
194
    def test_create_connection_ehlo_helo_fails(self):
 
195
        # Check that we raise an exception if both EHLO and HELO fail.
 
196
        factory = StubSMTPFactory(fail_on=['ehlo', 'helo'])
 
197
        conn = self.get_connection('', smtp_factory=factory)
 
198
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
199
        self.assertEqual([('connect', 'localhost'),
 
200
                          ('ehlo',),
 
201
                          ('helo',)], factory._calls)
 
202
 
 
203
    def test_create_connection_starttls(self):
 
204
        # Check that STARTTLS plus a second EHLO are called if the
 
205
        # server says it supports the feature.
 
206
        factory = StubSMTPFactory(smtp_features=['starttls'])
 
207
        conn = self.get_connection('', smtp_factory=factory)
 
208
        conn._create_connection()
 
209
        self.assertEqual([('connect', 'localhost'),
 
210
                          ('ehlo',),
 
211
                          ('has_extn', 'starttls'),
 
212
                          ('starttls',),
 
213
                          ('ehlo',)], factory._calls)
 
214
 
 
215
    def test_create_connection_starttls_fails(self):
 
216
        # Check that we raise an exception if the server claims to
 
217
        # support STARTTLS, but then fails when we try to activate it.
 
218
        factory = StubSMTPFactory(fail_on=['starttls'],
 
219
                                  smtp_features=['starttls'])
 
220
        conn = self.get_connection('', smtp_factory=factory)
 
221
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
222
        self.assertEqual([('connect', 'localhost'),
 
223
                          ('ehlo',),
 
224
                          ('has_extn', 'starttls'),
 
225
                          ('starttls',)], factory._calls)
 
226
 
58
227
    def test_get_message_addresses(self):
59
228
        msg = Message()
60
229
 
61
 
        from_, to = SMTPConnection.get_message_addresses(msg)
 
230
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
62
231
        self.assertEqual('', from_)
63
232
        self.assertEqual([], to)
64
233
 
67
236
        msg['CC'] = u'Pepe P\xe9rez <pperez@ejemplo.com>'
68
237
        msg['Bcc'] = 'user@localhost'
69
238
 
70
 
        from_, to = SMTPConnection.get_message_addresses(msg)
 
239
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
240
        self.assertEqual('jrandom@example.com', from_)
 
241
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
 
242
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
243
 
 
244
        # now with bzrlib's EmailMessage
 
245
        msg = email_message.EmailMessage(
 
246
            '"J. Random Developer" <jrandom@example.com>',
 
247
            ['John Doe <john@doe.com>', 'Jane Doe <jane@doe.com>',
 
248
             u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost' ],
 
249
            'subject')
 
250
 
 
251
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
71
252
        self.assertEqual('jrandom@example.com', from_)
72
253
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
73
254
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
79
260
 
80
261
        msg = Message()
81
262
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
82
 
        self.assertRaises(NoDestinationAddress,
83
 
                SMTPConnection(FakeConfig()).send_email, msg)
 
263
        self.assertRaises(
 
264
            errors.NoDestinationAddress,
 
265
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
266
 
 
267
        msg = email_message.EmailMessage('from@from.com', '', 'subject')
 
268
        self.assertRaises(
 
269
            errors.NoDestinationAddress,
 
270
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
271
 
 
272
        msg = email_message.EmailMessage('from@from.com', [], 'subject')
 
273
        self.assertRaises(
 
274
            errors.NoDestinationAddress,
 
275
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)