13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17
from cStringIO import StringIO
18
17
from email.Message import Message
20
from bzrlib import config
21
from bzrlib.errors import NoDestinationAddress
22
from bzrlib.tests import TestCase
23
from bzrlib.smtp_connection import SMTPConnection
26
class TestSMTPConnection(TestCase):
28
def get_connection(self, text):
29
my_config = config.GlobalConfig()
30
config_file = StringIO(text)
31
my_config._get_parser(config_file)
32
return SMTPConnection(my_config)
32
def connection_refuser():
34
raise socket.error(errno.ECONNREFUSED, 'Connection Refused')
36
smtp.connect = connect
40
class StubSMTPFactory(object):
41
"""A fake SMTP connection to test the connection setup."""
42
def __init__(self, fail_on=None, smtp_features=None):
43
self._fail_on = fail_on or []
45
self._smtp_features = smtp_features or []
46
self._ehlo_called = False
49
# The factory pretends to be a connection
52
def connect(self, server):
53
self._calls.append(('connect', server))
56
self._calls.append(('helo',))
57
if 'helo' in self._fail_on:
58
return 500, 'helo failure'
60
return 200, 'helo success'
63
self._calls.append(('ehlo',))
64
if 'ehlo' in self._fail_on:
65
return 500, 'ehlo failure'
67
self._ehlo_called = True
68
return 200, 'ehlo success'
70
def has_extn(self, extension):
71
self._calls.append(('has_extn', extension))
72
return self._ehlo_called and extension in self._smtp_features
75
self._calls.append(('starttls',))
76
if 'starttls' in self._fail_on:
77
return 500, 'starttls failure'
79
self._ehlo_called = True
80
return 200, 'starttls success'
83
class WideOpenSMTPFactory(StubSMTPFactory):
84
"""A fake smtp server that implements login by accepting anybody."""
86
def login(self, user, password):
87
self._calls.append(('login', user, password))
90
class TestSMTPConnection(tests.TestCaseInTempDir):
92
def get_connection(self, text, smtp_factory=None):
93
my_config = config.GlobalConfig.from_string(text)
94
return smtp_connection.SMTPConnection(my_config,
95
_smtp_factory=smtp_factory)
34
97
def test_defaults(self):
35
98
conn = self.get_connection('')
48
118
conn = self.get_connection('[DEFAULT]\nsmtp_username=joebody\n')
49
119
self.assertEqual(u'joebody', conn._smtp_username)
51
def test_smtp_password(self):
121
def test_smtp_password_from_config(self):
52
122
conn = self.get_connection('')
53
123
self.assertIs(None, conn._smtp_password)
55
125
conn = self.get_connection('[DEFAULT]\nsmtp_password=mypass\n')
56
126
self.assertEqual(u'mypass', conn._smtp_password)
128
def test_smtp_password_from_user(self):
131
factory = WideOpenSMTPFactory()
132
conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
133
smtp_factory=factory)
134
self.assertIs(None, conn._smtp_password)
136
ui.ui_factory = ui.CannedInputUIFactory([password])
138
self.assertEqual(password, conn._smtp_password)
140
def test_smtp_password_from_auth_config(self):
143
factory = WideOpenSMTPFactory()
144
conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
145
smtp_factory=factory)
146
self.assertEqual(user, conn._smtp_username)
147
self.assertIs(None, conn._smtp_password)
148
# Create a config file with the right password
149
conf = config.AuthenticationConfig()
150
conf._get_config().update({'smtptest':
151
{'scheme': 'smtp', 'user':user,
152
'password': password}})
156
self.assertEqual(password, conn._smtp_password)
158
def test_authenticate_with_byte_strings(self):
160
unicode_pass = u'h\xECspass'
161
utf8_pass = unicode_pass.encode('utf-8')
162
factory = WideOpenSMTPFactory()
163
conn = self.get_connection(
164
u'[DEFAULT]\nsmtp_username=%s\nsmtp_password=%s\n'
165
% (user, unicode_pass), smtp_factory=factory)
166
self.assertEqual(unicode_pass, conn._smtp_password)
168
self.assertEqual([('connect', 'localhost'),
170
('has_extn', 'starttls'),
171
('login', user, utf8_pass)], factory._calls)
172
smtp_username, smtp_password = factory._calls[-1][1:]
173
self.assertIsInstance(smtp_username, str)
174
self.assertIsInstance(smtp_password, str)
176
def test_create_connection(self):
177
factory = StubSMTPFactory()
178
conn = self.get_connection('', smtp_factory=factory)
179
conn._create_connection()
180
self.assertEqual([('connect', 'localhost'),
182
('has_extn', 'starttls')], factory._calls)
184
def test_create_connection_ehlo_fails(self):
185
# Check that we call HELO if EHLO failed.
186
factory = StubSMTPFactory(fail_on=['ehlo'])
187
conn = self.get_connection('', smtp_factory=factory)
188
conn._create_connection()
189
self.assertEqual([('connect', 'localhost'),
192
('has_extn', 'starttls')], factory._calls)
194
def test_create_connection_ehlo_helo_fails(self):
195
# Check that we raise an exception if both EHLO and HELO fail.
196
factory = StubSMTPFactory(fail_on=['ehlo', 'helo'])
197
conn = self.get_connection('', smtp_factory=factory)
198
self.assertRaises(errors.SMTPError, conn._create_connection)
199
self.assertEqual([('connect', 'localhost'),
201
('helo',)], factory._calls)
203
def test_create_connection_starttls(self):
204
# Check that STARTTLS plus a second EHLO are called if the
205
# server says it supports the feature.
206
factory = StubSMTPFactory(smtp_features=['starttls'])
207
conn = self.get_connection('', smtp_factory=factory)
208
conn._create_connection()
209
self.assertEqual([('connect', 'localhost'),
211
('has_extn', 'starttls'),
213
('ehlo',)], factory._calls)
215
def test_create_connection_starttls_fails(self):
216
# Check that we raise an exception if the server claims to
217
# support STARTTLS, but then fails when we try to activate it.
218
factory = StubSMTPFactory(fail_on=['starttls'],
219
smtp_features=['starttls'])
220
conn = self.get_connection('', smtp_factory=factory)
221
self.assertRaises(errors.SMTPError, conn._create_connection)
222
self.assertEqual([('connect', 'localhost'),
224
('has_extn', 'starttls'),
225
('starttls',)], factory._calls)
58
227
def test_get_message_addresses(self):
61
from_, to = SMTPConnection.get_message_addresses(msg)
230
from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
62
231
self.assertEqual('', from_)
63
232
self.assertEqual([], to)
67
236
msg['CC'] = u'Pepe P\xe9rez <pperez@ejemplo.com>'
68
237
msg['Bcc'] = 'user@localhost'
70
from_, to = SMTPConnection.get_message_addresses(msg)
239
from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
240
self.assertEqual('jrandom@example.com', from_)
241
self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
242
'pperez@ejemplo.com', 'user@localhost']), sorted(to))
244
# now with bzrlib's EmailMessage
245
msg = email_message.EmailMessage(
246
'"J. Random Developer" <jrandom@example.com>',
247
['John Doe <john@doe.com>', 'Jane Doe <jane@doe.com>',
248
u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost' ],
251
from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
71
252
self.assertEqual('jrandom@example.com', from_)
72
253
self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
73
254
'pperez@ejemplo.com', 'user@localhost']), sorted(to))