~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/smart/server.py

  • Committer: INADA Naoki
  • Date: 2011-05-05 13:03:12 UTC
  • mto: This revision was merged to the branch mainline in revision 5891.
  • Revision ID: songofacandy@gmail.com-20110505130312-ozvqmfjt7cuwi72k
Run build_mo when build is called.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006, 2007 Canonical Ltd
 
1
# Copyright (C) 2006-2010 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
17
"""Server for smart-server protocol."""
18
18
 
19
19
import errno
 
20
import os.path
20
21
import socket
21
 
import os
 
22
import sys
22
23
import threading
23
24
 
24
25
from bzrlib.hooks import Hooks
 
26
from bzrlib import (
 
27
    errors,
 
28
    trace,
 
29
    transport as _mod_transport,
 
30
)
 
31
from bzrlib.lazy_import import lazy_import
 
32
lazy_import(globals(), """
25
33
from bzrlib.smart import medium
 
34
from bzrlib.transport import (
 
35
    chroot,
 
36
    pathfilter,
 
37
    )
26
38
from bzrlib import (
27
 
    trace,
28
 
    transport,
29
39
    urlutils,
30
 
)
31
 
from bzrlib.smart.medium import SmartServerSocketStreamMedium
 
40
    )
 
41
""")
32
42
 
33
43
 
34
44
class SmartTCPServer(object):
35
 
    """Listens on a TCP socket and accepts connections from smart clients
 
45
    """Listens on a TCP socket and accepts connections from smart clients.
 
46
 
 
47
    Each connection will be served by a SmartServerSocketStreamMedium running in
 
48
    a thread.
36
49
 
37
50
    hooks: An instance of SmartServerHooks.
38
51
    """
39
52
 
40
 
    def __init__(self, backing_transport, host='127.0.0.1', port=0):
 
53
    def __init__(self, backing_transport, root_client_path='/'):
41
54
        """Construct a new server.
42
55
 
43
56
        To actually start it running, call either start_background_thread or
44
57
        serve.
45
58
 
 
59
        :param backing_transport: The transport to serve.
 
60
        :param root_client_path: The client path that will correspond to root
 
61
            of backing_transport.
 
62
        """
 
63
        self.backing_transport = backing_transport
 
64
        self.root_client_path = root_client_path
 
65
 
 
66
    def start_server(self, host, port):
 
67
        """Create the server listening socket.
 
68
 
46
69
        :param host: Name of the interface to listen on.
47
70
        :param port: TCP port to listen on, or 0 to allocate a transient port.
48
71
        """
53
76
        from socket import error as socket_error
54
77
        self._socket_error = socket_error
55
78
        self._socket_timeout = socket_timeout
56
 
        self._server_socket = socket.socket()
57
 
        self._server_socket.bind((host, port))
 
79
        addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
 
80
            socket.SOCK_STREAM, 0, socket.AI_PASSIVE)[0]
 
81
 
 
82
        (family, socktype, proto, canonname, sockaddr) = addrs
 
83
 
 
84
        self._server_socket = socket.socket(family, socktype, proto)
 
85
        # SO_REUSERADDR has a different meaning on Windows
 
86
        if sys.platform != 'win32':
 
87
            self._server_socket.setsockopt(socket.SOL_SOCKET,
 
88
                socket.SO_REUSEADDR, 1)
 
89
        try:
 
90
            self._server_socket.bind(sockaddr)
 
91
        except self._socket_error, message:
 
92
            raise errors.CannotBindAddress(host, port, message)
58
93
        self._sockname = self._server_socket.getsockname()
59
94
        self.port = self._sockname[1]
60
95
        self._server_socket.listen(1)
61
96
        self._server_socket.settimeout(1)
62
 
        self.backing_transport = backing_transport
63
97
        self._started = threading.Event()
64
98
        self._stopped = threading.Event()
65
99
 
66
 
    def serve(self):
 
100
    def _backing_urls(self):
 
101
        # There are three interesting urls:
 
102
        # The URL the server can be contacted on. (e.g. bzr://host/)
 
103
        # The URL that a commit done on the same machine as the server will
 
104
        # have within the servers space. (e.g. file:///home/user/source)
 
105
        # The URL that will be given to other hooks in the same process -
 
106
        # the URL of the backing transport itself. (e.g. filtered-36195:///)
 
107
        # We need all three because:
 
108
        #  * other machines see the first
 
109
        #  * local commits on this machine should be able to be mapped to
 
110
        #    this server
 
111
        #  * commits the server does itself need to be mapped across to this
 
112
        #    server.
 
113
        # The latter two urls are different aliases to the servers url,
 
114
        # so we group those in a list - as there might be more aliases
 
115
        # in the future.
 
116
        urls = [self.backing_transport.base]
 
117
        try:
 
118
            urls.append(self.backing_transport.external_url())
 
119
        except errors.InProcessTransport:
 
120
            pass
 
121
        return urls
 
122
 
 
123
    def run_server_started_hooks(self, backing_urls=None):
 
124
        if backing_urls is None:
 
125
            backing_urls = self._backing_urls()
 
126
        for hook in SmartTCPServer.hooks['server_started']:
 
127
            hook(backing_urls, self.get_url())
 
128
        for hook in SmartTCPServer.hooks['server_started_ex']:
 
129
            hook(backing_urls, self)
 
130
 
 
131
    def run_server_stopped_hooks(self, backing_urls=None):
 
132
        if backing_urls is None:
 
133
            backing_urls = self._backing_urls()
 
134
        for hook in SmartTCPServer.hooks['server_stopped']:
 
135
            hook(backing_urls, self.get_url())
 
136
 
 
137
    def serve(self, thread_name_suffix=''):
67
138
        self._should_terminate = False
68
 
        for hook in SmartTCPServer.hooks['server_started']:
69
 
            hook(self.backing_transport.base, self.get_url())
 
139
        # for hooks we are letting code know that a server has started (and
 
140
        # later stopped).
 
141
        self.run_server_started_hooks()
70
142
        self._started.set()
71
143
        try:
72
144
            try:
83
155
                        if e.args[0] != errno.EBADF:
84
156
                            trace.warning("listening socket error: %s", e)
85
157
                    else:
86
 
                        self.serve_conn(conn)
 
158
                        if self._should_terminate:
 
159
                            break
 
160
                        self.serve_conn(conn, thread_name_suffix)
87
161
            except KeyboardInterrupt:
88
162
                # dont log when CTRL-C'd.
89
163
                raise
90
164
            except Exception, e:
91
 
                trace.error("Unhandled smart server error.")
92
 
                trace.log_exception_quietly()
 
165
                trace.report_exception(sys.exc_info(), sys.stderr)
93
166
                raise
94
167
        finally:
95
168
            self._stopped.set()
99
172
            except self._socket_error:
100
173
                # ignore errors on close
101
174
                pass
102
 
            for hook in SmartTCPServer.hooks['server_stopped']:
103
 
                hook(self.backing_transport.base, self.get_url())
 
175
            self.run_server_stopped_hooks()
104
176
 
105
177
    def get_url(self):
106
178
        """Return the url of the server"""
107
 
        return "bzr://%s:%d/" % self._sockname
 
179
        return "bzr://%s:%s/" % (self._sockname[0], self._sockname[1])
108
180
 
109
 
    def serve_conn(self, conn):
 
181
    def serve_conn(self, conn, thread_name_suffix):
110
182
        # For WIN32, where the timeout value from the listening socket
111
 
        # propogates to the newly accepted socket.
 
183
        # propagates to the newly accepted socket.
112
184
        conn.setblocking(True)
113
185
        conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
114
 
        handler = SmartServerSocketStreamMedium(conn, self.backing_transport)
115
 
        connection_thread = threading.Thread(None, handler.serve, name='smart-server-child')
 
186
        handler = medium.SmartServerSocketStreamMedium(
 
187
            conn, self.backing_transport, self.root_client_path)
 
188
        thread_name = 'smart-server-child' + thread_name_suffix
 
189
        connection_thread = threading.Thread(
 
190
            None, handler.serve, name=thread_name)
 
191
        # FIXME: This thread is never joined, it should at least be collected
 
192
        # somewhere so that tests that want to check for leaked threads can get
 
193
        # rid of them -- vila 20100531
116
194
        connection_thread.setDaemon(True)
117
195
        connection_thread.start()
 
196
        return connection_thread
118
197
 
119
 
    def start_background_thread(self):
 
198
    def start_background_thread(self, thread_name_suffix=''):
120
199
        self._started.clear()
121
200
        self._server_thread = threading.Thread(None,
122
 
                self.serve,
 
201
                self.serve, args=(thread_name_suffix,),
123
202
                name='server-' + self.get_url())
124
203
        self._server_thread.setDaemon(True)
125
204
        self._server_thread.start()
131
210
        self._should_terminate = True
132
211
        # close the socket - gives error to connections from here on in,
133
212
        # rather than a connection reset error to connections made during
134
 
        # the period between setting _should_terminate = True and 
 
213
        # the period between setting _should_terminate = True and
135
214
        # the current request completing/aborting. It may also break out the
136
215
        # main loop if it was currently in accept() (on some platforms).
137
216
        try:
160
239
        These are all empty initially, because by default nothing should get
161
240
        notified.
162
241
        """
163
 
        Hooks.__init__(self)
164
 
        # Introduced in 0.16:
165
 
        # invoked whenever the server starts serving a directory.
166
 
        # The api signature is (backing url, public url).
167
 
        self['server_started'] = []
168
 
        # Introduced in 0.16:
169
 
        # invoked whenever the server stops serving a directory.
170
 
        # The api signature is (backing url, public url).
171
 
        self['server_stopped'] = []
 
242
        Hooks.__init__(self, "bzrlib.smart.server", "SmartTCPServer.hooks")
 
243
        self.add_hook('server_started',
 
244
            "Called by the bzr server when it starts serving a directory. "
 
245
            "server_started is called with (backing urls, public url), "
 
246
            "where backing_url is a list of URLs giving the "
 
247
            "server-specific directory locations, and public_url is the "
 
248
            "public URL for the directory being served.", (0, 16))
 
249
        self.add_hook('server_started_ex',
 
250
            "Called by the bzr server when it starts serving a directory. "
 
251
            "server_started is called with (backing_urls, server_obj).",
 
252
            (1, 17))
 
253
        self.add_hook('server_stopped',
 
254
            "Called by the bzr server when it stops serving a directory. "
 
255
            "server_stopped is called with the same parameters as the "
 
256
            "server_started hook: (backing_urls, public_url).", (0, 16))
172
257
 
173
258
SmartTCPServer.hooks = SmartServerHooks()
174
259
 
175
260
 
176
 
class SmartTCPServer_for_testing(SmartTCPServer):
177
 
    """Server suitable for use by transport tests.
178
 
    
179
 
    This server is backed by the process's cwd.
180
 
    """
181
 
 
182
 
    def __init__(self):
183
 
        self._homedir = urlutils.local_path_to_url(os.getcwd())[7:]
184
 
        # The server is set up by default like for ssh access: the client
185
 
        # passes filesystem-absolute paths; therefore the server must look
186
 
        # them up relative to the root directory.  it might be better to act
187
 
        # a public server and have the server rewrite paths into the test
188
 
        # directory.
189
 
        SmartTCPServer.__init__(self,
190
 
            transport.get_transport(urlutils.local_path_to_url('/')))
191
 
        
192
 
    def get_backing_transport(self, backing_transport_server):
193
 
        """Get a backing transport from a server we are decorating."""
194
 
        return transport.get_transport(backing_transport_server.get_url())
195
 
 
196
 
    def setUp(self, backing_transport_server=None):
197
 
        """Set up server for testing"""
198
 
        from bzrlib.transport.chroot import TestingChrootServer
199
 
        if backing_transport_server is None:
200
 
            from bzrlib.transport.local import LocalURLServer
201
 
            backing_transport_server = LocalURLServer()
202
 
        self.chroot_server = TestingChrootServer()
203
 
        self.chroot_server.setUp(backing_transport_server)
204
 
        self.backing_transport = transport.get_transport(
205
 
            self.chroot_server.get_url())
206
 
        self.start_background_thread()
207
 
 
208
 
    def tearDown(self):
209
 
        self.stop_background_thread()
210
 
 
211
 
    def get_bogus_url(self):
212
 
        """Return a URL which will fail to connect"""
213
 
        return 'bzr://127.0.0.1:1/'
214
 
 
 
261
def _local_path_for_transport(transport):
 
262
    """Return a local path for transport, if reasonably possible.
 
263
    
 
264
    This function works even if transport's url has a "readonly+" prefix,
 
265
    unlike local_path_from_url.
 
266
    
 
267
    This essentially recovers the --directory argument the user passed to "bzr
 
268
    serve" from the transport passed to serve_bzr.
 
269
    """
 
270
    try:
 
271
        base_url = transport.external_url()
 
272
    except (errors.InProcessTransport, NotImplementedError):
 
273
        return None
 
274
    else:
 
275
        # Strip readonly prefix
 
276
        if base_url.startswith('readonly+'):
 
277
            base_url = base_url[len('readonly+'):]
 
278
        try:
 
279
            return urlutils.local_path_from_url(base_url)
 
280
        except errors.InvalidURL:
 
281
            return None
 
282
 
 
283
 
 
284
class BzrServerFactory(object):
 
285
    """Helper class for serve_bzr."""
 
286
 
 
287
    def __init__(self, userdir_expander=None, get_base_path=None):
 
288
        self.cleanups = []
 
289
        self.base_path = None
 
290
        self.backing_transport = None
 
291
        if userdir_expander is None:
 
292
            userdir_expander = os.path.expanduser
 
293
        self.userdir_expander = userdir_expander
 
294
        if get_base_path is None:
 
295
            get_base_path = _local_path_for_transport
 
296
        self.get_base_path = get_base_path
 
297
 
 
298
    def _expand_userdirs(self, path):
 
299
        """Translate /~/ or /~user/ to e.g. /home/foo, using
 
300
        self.userdir_expander (os.path.expanduser by default).
 
301
 
 
302
        If the translated path would fall outside base_path, or the path does
 
303
        not start with ~, then no translation is applied.
 
304
 
 
305
        If the path is inside, it is adjusted to be relative to the base path.
 
306
 
 
307
        e.g. if base_path is /home, and the expanded path is /home/joe, then
 
308
        the translated path is joe.
 
309
        """
 
310
        result = path
 
311
        if path.startswith('~'):
 
312
            expanded = self.userdir_expander(path)
 
313
            if not expanded.endswith('/'):
 
314
                expanded += '/'
 
315
            if expanded.startswith(self.base_path):
 
316
                result = expanded[len(self.base_path):]
 
317
        return result
 
318
 
 
319
    def _make_expand_userdirs_filter(self, transport):
 
320
        return pathfilter.PathFilteringServer(transport, self._expand_userdirs)
 
321
 
 
322
    def _make_backing_transport(self, transport):
 
323
        """Chroot transport, and decorate with userdir expander."""
 
324
        self.base_path = self.get_base_path(transport)
 
325
        chroot_server = chroot.ChrootServer(transport)
 
326
        chroot_server.start_server()
 
327
        self.cleanups.append(chroot_server.stop_server)
 
328
        transport = _mod_transport.get_transport(chroot_server.get_url())
 
329
        if self.base_path is not None:
 
330
            # Decorate the server's backing transport with a filter that can
 
331
            # expand homedirs.
 
332
            expand_userdirs = self._make_expand_userdirs_filter(transport)
 
333
            expand_userdirs.start_server()
 
334
            self.cleanups.append(expand_userdirs.stop_server)
 
335
            transport = _mod_transport.get_transport(expand_userdirs.get_url())
 
336
        self.transport = transport
 
337
 
 
338
    def _make_smart_server(self, host, port, inet):
 
339
        if inet:
 
340
            smart_server = medium.SmartServerPipeStreamMedium(
 
341
                sys.stdin, sys.stdout, self.transport)
 
342
        else:
 
343
            if host is None:
 
344
                host = medium.BZR_DEFAULT_INTERFACE
 
345
            if port is None:
 
346
                port = medium.BZR_DEFAULT_PORT
 
347
            smart_server = SmartTCPServer(self.transport)
 
348
            smart_server.start_server(host, port)
 
349
            trace.note('listening on port: %s' % smart_server.port)
 
350
        self.smart_server = smart_server
 
351
 
 
352
    def _change_globals(self):
 
353
        from bzrlib import lockdir, ui
 
354
        # For the duration of this server, no UI output is permitted. note
 
355
        # that this may cause problems with blackbox tests. This should be
 
356
        # changed with care though, as we dont want to use bandwidth sending
 
357
        # progress over stderr to smart server clients!
 
358
        old_factory = ui.ui_factory
 
359
        old_lockdir_timeout = lockdir._DEFAULT_TIMEOUT_SECONDS
 
360
        def restore_default_ui_factory_and_lockdir_timeout():
 
361
            ui.ui_factory = old_factory
 
362
            lockdir._DEFAULT_TIMEOUT_SECONDS = old_lockdir_timeout
 
363
        self.cleanups.append(restore_default_ui_factory_and_lockdir_timeout)
 
364
        ui.ui_factory = ui.SilentUIFactory()
 
365
        lockdir._DEFAULT_TIMEOUT_SECONDS = 0
 
366
 
 
367
    def set_up(self, transport, host, port, inet):
 
368
        self._make_backing_transport(transport)
 
369
        self._make_smart_server(host, port, inet)
 
370
        self._change_globals()
 
371
 
 
372
    def tear_down(self):
 
373
        for cleanup in reversed(self.cleanups):
 
374
            cleanup()
 
375
 
 
376
 
 
377
def serve_bzr(transport, host=None, port=None, inet=False):
 
378
    """This is the default implementation of 'bzr serve'.
 
379
    
 
380
    It creates a TCP or pipe smart server on 'transport, and runs it.  The
 
381
    transport will be decorated with a chroot and pathfilter (using
 
382
    os.path.expanduser).
 
383
    """
 
384
    bzr_server = BzrServerFactory()
 
385
    try:
 
386
        bzr_server.set_up(transport, host, port, inet)
 
387
        bzr_server.smart_server.serve()
 
388
    finally:
 
389
        bzr_server.tear_down()
215
390