~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_smtp_connection.py

  • Committer: John Arbash Meinel
  • Date: 2008-08-18 22:34:21 UTC
  • mto: (3606.5.6 1.6)
  • mto: This revision was merged to the branch mainline in revision 3641.
  • Revision ID: john@arbash-meinel.com-20080818223421-todjny24vj4faj4t
Add tests for the fetching behavior.

The proper parameter passed is 'unordered' add an assert for it, and
fix callers that were passing 'unsorted' instead.
Add tests that we make the right get_record_stream call based
on the value of _fetch_uses_deltas.
Fix the fetch request for signatures.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2007 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
from cStringIO import StringIO
 
18
from email.Message import Message
 
19
import errno
 
20
import smtplib
 
21
import socket
 
22
import sys
 
23
 
 
24
from bzrlib import (
 
25
    config,
 
26
    email_message,
 
27
    errors,
 
28
    smtp_connection,
 
29
    tests,
 
30
    ui,
 
31
    )
 
32
 
 
33
 
 
34
def connection_refuser():
 
35
    def connect(server):
 
36
        raise socket.error(errno.ECONNREFUSED, 'Connection Refused')
 
37
    smtp = smtplib.SMTP()
 
38
    smtp.connect = connect
 
39
    return smtp
 
40
 
 
41
 
 
42
class StubSMTPFactory(object):
 
43
    """A fake SMTP connection to test the connection setup."""
 
44
    def __init__(self, fail_on=None, smtp_features=None):
 
45
        self._fail_on = fail_on or []
 
46
        self._calls = []
 
47
        self._smtp_features = smtp_features or []
 
48
        self._ehlo_called = False
 
49
 
 
50
    def __call__(self):
 
51
        # The factory pretends to be a connection
 
52
        return self
 
53
 
 
54
    def connect(self, server):
 
55
        self._calls.append(('connect', server))
 
56
 
 
57
    def helo(self):
 
58
        self._calls.append(('helo',))
 
59
        if 'helo' in self._fail_on:
 
60
            return 500, 'helo failure'
 
61
        else:
 
62
            return 200, 'helo success'
 
63
 
 
64
    def ehlo(self):
 
65
        self._calls.append(('ehlo',))
 
66
        if 'ehlo' in self._fail_on:
 
67
            return 500, 'ehlo failure'
 
68
        else:
 
69
            self._ehlo_called = True
 
70
            return 200, 'ehlo success'
 
71
 
 
72
    def has_extn(self, extension):
 
73
        self._calls.append(('has_extn', extension))
 
74
        return self._ehlo_called and extension in self._smtp_features
 
75
 
 
76
    def starttls(self):
 
77
        self._calls.append(('starttls',))
 
78
        if 'starttls' in self._fail_on:
 
79
            return 500, 'starttls failure'
 
80
        else:
 
81
            self._ehlo_called = True
 
82
            return 200, 'starttls success'
 
83
 
 
84
 
 
85
class WideOpenSMTPFactory(StubSMTPFactory):
 
86
    """A fake smtp server that implements login by accepting anybody."""
 
87
 
 
88
    def login(self, user, password):
 
89
        pass
 
90
 
 
91
 
 
92
class TestSMTPConnection(tests.TestCaseInTempDir):
 
93
 
 
94
    def get_connection(self, text, smtp_factory=None):
 
95
        my_config = config.GlobalConfig()
 
96
        config_file = StringIO(text)
 
97
        my_config._get_parser(config_file)
 
98
        return smtp_connection.SMTPConnection(my_config,
 
99
                                              _smtp_factory=smtp_factory)
 
100
 
 
101
    def test_defaults(self):
 
102
        conn = self.get_connection('')
 
103
        self.assertEqual('localhost', conn._smtp_server)
 
104
        self.assertEqual(None, conn._smtp_username)
 
105
        self.assertEqual(None, conn._smtp_password)
 
106
 
 
107
    def test_smtp_server(self):
 
108
        conn = self.get_connection('[DEFAULT]\nsmtp_server=host:10\n')
 
109
        self.assertEqual('host:10', conn._smtp_server)
 
110
 
 
111
    def test_missing_server(self):
 
112
        conn = self.get_connection('', smtp_factory=connection_refuser)
 
113
        self.assertRaises(errors.DefaultSMTPConnectionRefused, conn._connect)
 
114
        conn = self.get_connection('[DEFAULT]\nsmtp_server=smtp.example.com\n',
 
115
                                   smtp_factory=connection_refuser)
 
116
        self.assertRaises(errors.SMTPConnectionRefused, conn._connect)
 
117
 
 
118
    def test_smtp_username(self):
 
119
        conn = self.get_connection('')
 
120
        self.assertIs(None, conn._smtp_username)
 
121
 
 
122
        conn = self.get_connection('[DEFAULT]\nsmtp_username=joebody\n')
 
123
        self.assertEqual(u'joebody', conn._smtp_username)
 
124
 
 
125
    def test_smtp_password_from_config(self):
 
126
        conn = self.get_connection('')
 
127
        self.assertIs(None, conn._smtp_password)
 
128
 
 
129
        conn = self.get_connection('[DEFAULT]\nsmtp_password=mypass\n')
 
130
        self.assertEqual(u'mypass', conn._smtp_password)
 
131
 
 
132
    def test_smtp_password_from_user(self):
 
133
        user = 'joe'
 
134
        password = 'hispass'
 
135
        factory = WideOpenSMTPFactory()
 
136
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
137
                                   smtp_factory=factory)
 
138
        self.assertIs(None, conn._smtp_password)
 
139
 
 
140
        ui.ui_factory = tests.TestUIFactory(stdin=password + '\n',
 
141
                                            stdout=tests.StringIOWrapper())
 
142
        conn._connect()
 
143
        self.assertEqual(password, conn._smtp_password)
 
144
        # stdin should be empty (the provided password have been consumed)
 
145
        self.assertEqual('', ui.ui_factory.stdin.readline())
 
146
 
 
147
    def test_smtp_password_from_auth_config(self):
 
148
        user = 'joe'
 
149
        password = 'hispass'
 
150
        factory = WideOpenSMTPFactory()
 
151
        conn = self.get_connection('[DEFAULT]\nsmtp_username=%s\n' % user,
 
152
                                   smtp_factory=factory)
 
153
        self.assertEqual(user, conn._smtp_username)
 
154
        self.assertIs(None, conn._smtp_password)
 
155
        # Create a config file with the right password
 
156
        conf = config.AuthenticationConfig()
 
157
        conf._get_config().update({'smtptest':
 
158
                                       {'scheme': 'smtp', 'user':user,
 
159
                                        'password': password}})
 
160
        conf._save()
 
161
 
 
162
        conn._connect()
 
163
        self.assertEqual(password, conn._smtp_password)
 
164
 
 
165
    def test_create_connection(self):
 
166
        factory = StubSMTPFactory()
 
167
        conn = self.get_connection('', smtp_factory=factory)
 
168
        conn._create_connection()
 
169
        self.assertEqual([('connect', 'localhost'),
 
170
                          ('ehlo',),
 
171
                          ('has_extn', 'starttls')], factory._calls)
 
172
 
 
173
    def test_create_connection_ehlo_fails(self):
 
174
        # Check that we call HELO if EHLO failed.
 
175
        factory = StubSMTPFactory(fail_on=['ehlo'])
 
176
        conn = self.get_connection('', smtp_factory=factory)
 
177
        conn._create_connection()
 
178
        self.assertEqual([('connect', 'localhost'),
 
179
                          ('ehlo',),
 
180
                          ('helo',),
 
181
                          ('has_extn', 'starttls')], factory._calls)
 
182
 
 
183
    def test_create_connection_ehlo_helo_fails(self):
 
184
        # Check that we raise an exception if both EHLO and HELO fail.
 
185
        factory = StubSMTPFactory(fail_on=['ehlo', 'helo'])
 
186
        conn = self.get_connection('', smtp_factory=factory)
 
187
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
188
        self.assertEqual([('connect', 'localhost'),
 
189
                          ('ehlo',),
 
190
                          ('helo',)], factory._calls)
 
191
 
 
192
    def test_create_connection_starttls(self):
 
193
        # Check that STARTTLS plus a second EHLO are called if the
 
194
        # server says it supports the feature.
 
195
        factory = StubSMTPFactory(smtp_features=['starttls'])
 
196
        conn = self.get_connection('', smtp_factory=factory)
 
197
        conn._create_connection()
 
198
        self.assertEqual([('connect', 'localhost'),
 
199
                          ('ehlo',),
 
200
                          ('has_extn', 'starttls'),
 
201
                          ('starttls',),
 
202
                          ('ehlo',)], factory._calls)
 
203
 
 
204
    def test_create_connection_starttls_fails(self):
 
205
        # Check that we raise an exception if the server claims to
 
206
        # support STARTTLS, but then fails when we try to activate it.
 
207
        factory = StubSMTPFactory(fail_on=['starttls'],
 
208
                                  smtp_features=['starttls'])
 
209
        conn = self.get_connection('', smtp_factory=factory)
 
210
        self.assertRaises(errors.SMTPError, conn._create_connection)
 
211
        self.assertEqual([('connect', 'localhost'),
 
212
                          ('ehlo',),
 
213
                          ('has_extn', 'starttls'),
 
214
                          ('starttls',)], factory._calls)
 
215
 
 
216
    def test_get_message_addresses(self):
 
217
        msg = Message()
 
218
 
 
219
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
220
        self.assertEqual('', from_)
 
221
        self.assertEqual([], to)
 
222
 
 
223
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
 
224
        msg['To'] = 'John Doe <john@doe.com>, Jane Doe <jane@doe.com>'
 
225
        msg['CC'] = u'Pepe P\xe9rez <pperez@ejemplo.com>'
 
226
        msg['Bcc'] = 'user@localhost'
 
227
 
 
228
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
229
        self.assertEqual('jrandom@example.com', from_)
 
230
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
 
231
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
232
 
 
233
        # now with bzrlib's EmailMessage
 
234
        msg = email_message.EmailMessage(
 
235
            '"J. Random Developer" <jrandom@example.com>',
 
236
            ['John Doe <john@doe.com>', 'Jane Doe <jane@doe.com>',
 
237
             u'Pepe P\xe9rez <pperez@ejemplo.com>', 'user@localhost' ],
 
238
            'subject')
 
239
 
 
240
        from_, to = smtp_connection.SMTPConnection.get_message_addresses(msg)
 
241
        self.assertEqual('jrandom@example.com', from_)
 
242
        self.assertEqual(sorted(['john@doe.com', 'jane@doe.com',
 
243
            'pperez@ejemplo.com', 'user@localhost']), sorted(to))
 
244
 
 
245
    def test_destination_address_required(self):
 
246
        class FakeConfig:
 
247
            def get_user_option(self, option):
 
248
                return None
 
249
 
 
250
        msg = Message()
 
251
        msg['From'] = '"J. Random Developer" <jrandom@example.com>'
 
252
        self.assertRaises(
 
253
            errors.NoDestinationAddress,
 
254
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
255
 
 
256
        msg = email_message.EmailMessage('from@from.com', '', 'subject')
 
257
        self.assertRaises(
 
258
            errors.NoDestinationAddress,
 
259
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)
 
260
 
 
261
        msg = email_message.EmailMessage('from@from.com', [], 'subject')
 
262
        self.assertRaises(
 
263
            errors.NoDestinationAddress,
 
264
            smtp_connection.SMTPConnection(FakeConfig()).send_email, msg)