31
28
TestCaseWithMemoryTransport
33
from bzrlib.plugins.launchpad import (
30
from bzrlib.transport import get_transport
31
from bzrlib.plugins.launchpad import _register_directory
37
32
from bzrlib.plugins.launchpad.lp_directory import (
38
33
LaunchpadDirectory)
39
from bzrlib.plugins.launchpad.account import get_lp_login, set_lp_login
40
from bzrlib.tests import (
46
def load_tests(standard_tests, module, loader):
47
result = loader.suiteClass()
48
t_tests, remaining_tests = tests.split_suite_by_condition(
49
standard_tests, tests.condition_isinstance((
52
transport_scenarios = [
53
('http', dict(server_class=PreCannedHTTPServer,)),
55
if tests.HTTPSServerFeature.available():
56
transport_scenarios.append(
57
('https', dict(server_class=PreCannedHTTPSServer,)),
59
tests.multiply_tests(t_tests, transport_scenarios, result)
61
# No parametrization for the remaining tests
62
result.addTests(remaining_tests)
34
from bzrlib.plugins.launchpad.account import get_lp_login
67
37
class FakeResolveFactory(object):
199
169
self.assertRaises(errors.InvalidURL,
200
170
directory._resolve, 'lp://ratotehunoahu')
202
def test_resolve_tilde_to_user(self):
203
factory = FakeResolveFactory(
204
self, '~username/apt/test', dict(urls=[
205
'bzr+ssh://bazaar.launchpad.net/~username/apt/test']))
206
directory = LaunchpadDirectory()
208
'bzr+ssh://bazaar.launchpad.net/~username/apt/test',
209
directory._resolve('lp:~/apt/test', factory, _lp_login='username'))
210
# Should also happen when the login is just set by config
211
set_lp_login('username')
213
'bzr+ssh://bazaar.launchpad.net/~username/apt/test',
214
directory._resolve('lp:~/apt/test', factory))
216
def test_tilde_fails_no_login(self):
217
factory = FakeResolveFactory(
218
self, '~username/apt/test', dict(urls=[
219
'bzr+ssh://bazaar.launchpad.net/~username/apt/test']))
220
self.assertIs(None, get_lp_login())
221
directory = LaunchpadDirectory()
222
e = self.assertRaises(errors.InvalidURL,
223
directory._resolve, 'lp:~/apt/test', factory)
226
173
class DirectoryOpenBranchTests(TestCaseWithMemoryTransport):
239
186
directories.remove('lp:')
240
187
directories.register('lp:', FooService, 'Map lp URLs to local urls')
241
188
self.addCleanup(_register_directory)
242
self.addCleanup(directories.remove, 'lp:')
243
t = transport.get_transport('lp:///apt')
244
branch = Branch.open_from_transport(t)
189
self.addCleanup(lambda: directories.remove('lp:'))
190
transport = get_transport('lp:///apt')
191
branch = Branch.open_from_transport(transport)
245
192
self.assertEqual(target_branch.base, branch.base)
248
class PredefinedRequestHandler(http_server.TestingHTTPRequestHandler):
249
"""Request handler for a unique and pre-defined request.
251
The only thing we care about here is that we receive a connection. But
252
since we want to dialog with a real http client, we have to send it correct
255
We expect to receive a *single* request nothing more (and we won't even
256
check what request it is), the tests will recognize us from our response.
259
def handle_one_request(self):
260
tcs = self.server.test_case_server
261
requestline = self.rfile.readline()
262
headers = self.MessageClass(self.rfile, 0)
263
if requestline.startswith('POST'):
264
# The body should be a single line (or we don't know where it ends
265
# and we don't want to issue a blocking read)
266
body = self.rfile.readline()
268
self.wfile.write(tcs.canned_response)
271
class PreCannedServerMixin(object):
274
super(PreCannedServerMixin, self).__init__(
275
request_handler=PredefinedRequestHandler)
276
# Bytes read and written by the server
278
self.bytes_written = 0
279
self.canned_response = None
282
class PreCannedHTTPServer(PreCannedServerMixin, http_server.HttpServer):
286
if tests.HTTPSServerFeature.available():
287
from bzrlib.tests import https_server
288
class PreCannedHTTPSServer(PreCannedServerMixin, https_server.HTTPSServer):
292
class TestXMLRPCTransport(tests.TestCase):
298
tests.TestCase.setUp(self)
299
self.server = self.server_class()
300
self.server.start_server()
301
# Ensure we don't clobber env
302
self._captureVar('BZR_LP_XMLRPC_URL', None)
305
self.server.stop_server()
306
tests.TestCase.tearDown(self)
308
def set_canned_response(self, server, path):
309
response_format = '''HTTP/1.1 200 OK\r
310
Date: Tue, 11 Jul 2006 04:32:56 GMT\r
311
Server: Apache/2.0.54 (Fedora)\r
312
Last-Modified: Sun, 23 Apr 2006 19:35:20 GMT\r
313
ETag: "56691-23-38e9ae00"\r
314
Accept-Ranges: bytes\r
315
Content-Length: %(length)d\r
317
Content-Type: text/plain; charset=UTF-8\r
319
<?xml version='1.0'?>
327
<value><string>bzr+ssh://bazaar.launchpad.net/%(path)s</string></value>
328
<value><string>http://bazaar.launchpad.net/%(path)s</string></value>
329
</data></array></value>
336
length = 334 + 2 * len(path)
337
server.canned_response = response_format % dict(length=length,
340
def do_request(self, server_url):
341
os.environ['BZR_LP_XMLRPC_URL'] = self.server.get_url()
342
service = lp_registration.LaunchpadService()
343
resolve = lp_registration.ResolveLaunchpadPathRequest('bzr')
344
result = resolve.submit(service)
347
def test_direct_request(self):
348
self.set_canned_response(self.server, '~bzr-pqm/bzr/bzr.dev')
349
result = self.do_request(self.server.get_url())
350
urls = result.get('urls', None)
351
self.assertIsNot(None, urls)
353
['bzr+ssh://bazaar.launchpad.net/~bzr-pqm/bzr/bzr.dev',
354
'http://bazaar.launchpad.net/~bzr-pqm/bzr/bzr.dev'],
356
# FIXME: we need to test with a real proxy, I can't find a way so simulate
357
# CONNECT without leaving one server hanging the test :-/ Since that maybe
358
# related to the leaking tests problems, I'll punt for now -- vila 20091030