14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
import BaseHTTPServer, SimpleHTTPServer, socket, errno, time
18
from bzrlib.tests import TestCaseInTempDir
19
from bzrlib.osutils import relpath
17
import BaseHTTPServer, SimpleHTTPServer
18
from bzrlib.selftest import TestCaseInTempDir
22
21
class WebserverNotAvailable(Exception):
24
class BadWebserverPath(ValueError):
26
return 'path %s is not in %s' % self.args
25
28
class TestingHTTPRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
26
29
def log_message(self, format, *args):
27
self.server.test_case.log("webserver - %s - - [%s] %s",
28
self.address_string(),
29
self.log_date_time_string(),
32
def handle_one_request(self):
33
"""Handle a single HTTP request.
35
You normally don't need to override this method; see the class
36
__doc__ string for information on how to handle specific HTTP
37
commands such as GET and POST.
40
for i in xrange(1,11): # Don't try more than 10 times
42
self.raw_requestline = self.rfile.readline()
43
except socket.error, e:
44
if e.args[0] in (errno.EAGAIN, errno.EWOULDBLOCK):
45
# omitted for now because some tests look at the log of
46
# the server and expect to see no errors. see recent
47
# email thread. -- mbp 20051021.
48
## self.log_message('EAGAIN (%d) while reading from raw_requestline' % i)
54
if not self.raw_requestline:
55
self.close_connection = 1
57
if not self.parse_request(): # An error code has been sent, just exit
59
mname = 'do_' + self.command
60
if not hasattr(self, mname):
61
self.send_error(501, "Unsupported method (%r)" % self.command)
63
method = getattr(self, mname)
30
self.server.test_case.log("webserver - %s - - [%s] %s\n" %
31
(self.address_string(),
32
self.log_date_time_string(),
66
35
class TestingHTTPServer(BaseHTTPServer.HTTPServer):
67
36
def __init__(self, server_address, RequestHandlerClass, test_case):
113
81
def get_remote_url(self, path):
84
path_parts = path.split(os.path.sep)
116
85
if os.path.isabs(path):
117
remote_path = relpath(self.test_dir, path)
86
if path_parts[:len(self._local_path_parts)] != \
87
self._local_path_parts:
88
raise BadWebserverPath(path, self.test_dir)
89
remote_path = '/'.join(path_parts[len(self._local_path_parts):])
91
remote_path = '/'.join(path_parts)
121
93
self._http_starting.acquire()
122
94
self._http_starting.release()
123
95
return self._http_base_url + remote_path
126
TestCaseInTempDir.setUp(self)
98
super(TestCaseWithWebserver, self).setUp()
127
99
import threading, os
100
self._local_path_parts = self.test_dir.split(os.path.sep)
128
101
self._http_starting = threading.Lock()
129
102
self._http_starting.acquire()
130
103
self._http_running = True
132
105
self._http_thread = threading.Thread(target=self._http_start)
133
106
self._http_thread.setDaemon(True)
134
107
self._http_thread.start()
135
self._http_proxy = os.environ.get("http_proxy")
136
if self._http_proxy is not None:
137
del os.environ["http_proxy"]
139
109
def tearDown(self):
140
110
self._http_running = False
141
111
self._http_thread.join()
142
if self._http_proxy is not None:
144
os.environ["http_proxy"] = self._http_proxy
145
TestCaseInTempDir.tearDown(self)
112
super(TestCaseWithWebserver, self).tearDown()