1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Tests of the bzr serve command."""
25
from bzrlib import errors
26
from bzrlib.branch import Branch
27
from bzrlib.bzrdir import BzrDir
28
from bzrlib.errors import ParamikoNotPresent
29
from bzrlib.tests import TestCaseWithTransport, TestSkipped
30
from bzrlib.transport import get_transport, smart
33
class TestBzrServe(TestCaseWithTransport):
35
def assertInetServerShutsdownCleanly(self, process):
36
"""Shutdown the server process looking for errors."""
37
# Shutdown the server: the server should shut down when it cannot read
40
# Hide stdin from the subprocess module, so it won't fail to close it.
42
result = self.finish_bzr_subprocess(process, retcode=0)
43
self.assertEqual('', result[0])
44
self.assertEqual('', result[1])
46
def assertServerFinishesCleanly(self, process):
47
"""Shutdown the bzr serve instance process looking for errors."""
49
result = self.finish_bzr_subprocess(process, retcode=3,
50
send_signal=signal.SIGINT)
51
self.assertEqual('', result[0])
52
self.assertEqual('bzr: interrupted\n', result[1])
54
def start_server_inet(self, extra_options=()):
55
"""Start a bzr server subprocess using the --inet option.
57
:param extra_options: extra options to give the server.
58
:return: a tuple with the bzr process handle for passing to
59
finish_bzr_subprocess, a client for the server, and a transport.
61
# Serve from the current directory
62
process = self.start_bzr_subprocess(['serve', '--inet'])
64
# Connect to the server
65
# We use this url because while this is no valid URL to connect to this
66
# server instance, the transport needs a URL.
67
medium = smart.SmartSimplePipesClientMedium(
68
process.stdout, process.stdin)
69
transport = smart.SmartTransport('bzr://localhost/', medium=medium)
70
return process, transport
72
def start_server_port(self, extra_options=()):
73
"""Start a bzr server subprocess.
75
:param extra_options: extra options to give the server.
76
:return: a tuple with the bzr process handle for passing to
77
finish_bzr_subprocess, and the base url for the server.
79
# Serve from the current directory
80
args = ['serve', '--port', 'localhost:0']
81
args.extend(extra_options)
82
process = self.start_bzr_subprocess(args, skip_if_plan_to_signal=True)
83
port_line = process.stdout.readline()
84
prefix = 'listening on port: '
85
self.assertStartsWith(port_line, prefix)
86
port = int(port_line[len(prefix):])
87
return process,'bzr://localhost:%d/' % port
89
def test_bzr_serve_inet_readonly(self):
90
"""bzr server should provide a read only filesystem by default."""
91
process, transport = self.start_server_inet()
92
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
93
self.assertInetServerShutsdownCleanly(process)
95
def test_bzr_serve_inet_readwrite(self):
99
process, transport = self.start_server_inet(['--allow-writes'])
101
# We get a working branch
102
branch = BzrDir.open_from_transport(transport).open_branch()
103
branch.repository.get_revision_graph()
104
self.assertEqual(None, branch.last_revision())
105
self.assertInetServerShutsdownCleanly(process)
107
def test_bzr_serve_port_readonly(self):
108
"""bzr server should provide a read only filesystem by default."""
109
process, url = self.start_server_port()
110
transport = get_transport(url)
111
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
112
self.assertServerFinishesCleanly(process)
114
def test_bzr_serve_port_readwrite(self):
116
self.make_branch('.')
118
process, url = self.start_server_port(['--allow-writes'])
120
# Connect to the server
121
branch = Branch.open(url)
123
# We get a working branch
124
branch.repository.get_revision_graph()
125
self.assertEqual(None, branch.last_revision())
127
self.assertServerFinishesCleanly(process)
129
def test_bzr_serve_no_args(self):
130
"""'bzr serve' with no arguments or options should not traceback."""
131
out, err = self.run_bzr_error(
132
['bzr serve requires one of --inet or --port'], 'serve')
134
def test_bzr_connect_to_bzr_ssh(self):
135
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
137
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
140
from bzrlib.transport.sftp import SFTPServer
141
except ParamikoNotPresent:
142
raise TestSkipped('Paramiko not installed')
143
from bzrlib.tests.stub_sftp import StubServer
146
self.make_branch('a_branch')
148
# Start an SSH server
149
self.command_executed = []
150
# XXX: This is horrible -- we define a really dumb SSH server that
151
# executes commands, and manage the hooking up of stdin/out/err to the
152
# SSH channel ourselves. Surely this has already been implemented
154
class StubSSHServer(StubServer):
158
def check_channel_exec_request(self, channel, command):
159
self.test.command_executed.append(command)
160
proc = subprocess.Popen(
161
command, shell=True, stdin=subprocess.PIPE,
162
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
164
# XXX: horribly inefficient, not to mention ugly.
165
# Start a thread for each of stdin/out/err, and relay bytes from
166
# the subprocess to channel and vice versa.
167
def ferry_bytes(read, write, close):
176
(channel.recv, proc.stdin.write, proc.stdin.close),
177
(proc.stdout.read, channel.sendall, channel.close),
178
(proc.stderr.read, channel.sendall_stderr, channel.close)]
179
for read, write, close in file_functions:
180
t = threading.Thread(
181
target=ferry_bytes, args=(read, write, close))
186
ssh_server = SFTPServer(StubSSHServer)
187
# XXX: We *don't* want to override the default SSH vendor, so we set
188
# _vendor to what _get_ssh_vendor returns.
190
self.addCleanup(ssh_server.tearDown)
191
port = ssh_server._listener.port
193
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
194
# variable is used to tell bzr what command to run on the remote end.
195
path_to_branch = os.path.abspath('a_branch')
197
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
198
os.environ['BZR_REMOTE_PATH'] = self.get_bzr_path()
200
branch = Branch.open(
201
'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch))
203
branch.repository.get_revision_graph()
204
self.assertEqual(None, branch.last_revision())
205
# Check we can perform write operations
206
branch.bzrdir.root_transport.mkdir('foo')
208
# Restore the BZR_REMOTE_PATH environment variable back to its
210
if orig_bzr_remote_path is None:
211
del os.environ['BZR_REMOTE_PATH']
213
os.environ['BZR_REMOTE_PATH'] = orig_bzr_remote_path
216
['%s serve --inet --directory=/ --allow-writes'
217
% self.get_bzr_path()],
218
self.command_executed)