1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Tests of the bzr serve command."""
29
revision as _mod_revision,
31
from bzrlib.branch import Branch
32
from bzrlib.bzrdir import BzrDir
33
from bzrlib.errors import ParamikoNotPresent
34
from bzrlib.smart import medium
35
from bzrlib.tests import TestCaseWithTransport, TestSkipped
36
from bzrlib.transport import get_transport, remote
39
class TestBzrServe(TestCaseWithTransport):
41
def assertInetServerShutsdownCleanly(self, process):
42
"""Shutdown the server process looking for errors."""
43
# Shutdown the server: the server should shut down when it cannot read
46
# Hide stdin from the subprocess module, so it won't fail to close it.
48
result = self.finish_bzr_subprocess(process)
49
self.assertEqual('', result[0])
50
self.assertEqual('', result[1])
52
def assertServerFinishesCleanly(self, process):
53
"""Shutdown the bzr serve instance process looking for errors."""
55
result = self.finish_bzr_subprocess(process, retcode=3,
56
send_signal=signal.SIGINT)
57
self.assertEqual('', result[0])
58
self.assertEqual('bzr: interrupted\n', result[1])
60
def start_server_inet(self, extra_options=()):
61
"""Start a bzr server subprocess using the --inet option.
63
:param extra_options: extra options to give the server.
64
:return: a tuple with the bzr process handle for passing to
65
finish_bzr_subprocess, a client for the server, and a transport.
67
# Serve from the current directory
68
process = self.start_bzr_subprocess(['serve', '--inet'])
70
# Connect to the server
71
# We use this url because while this is no valid URL to connect to this
72
# server instance, the transport needs a URL.
73
client_medium = medium.SmartSimplePipesClientMedium(
74
process.stdout, process.stdin)
75
transport = remote.RemoteTransport(
76
'bzr://localhost/', medium=client_medium)
77
return process, transport
79
def start_server_port(self, extra_options=()):
80
"""Start a bzr server subprocess.
82
:param extra_options: extra options to give the server.
83
:return: a tuple with the bzr process handle for passing to
84
finish_bzr_subprocess, and the base url for the server.
86
# Serve from the current directory
87
args = ['serve', '--port', 'localhost:0']
88
args.extend(extra_options)
89
process = self.start_bzr_subprocess(args, skip_if_plan_to_signal=True)
90
port_line = process.stdout.readline()
91
prefix = 'listening on port: '
92
self.assertStartsWith(port_line, prefix)
93
port = int(port_line[len(prefix):])
94
return process,'bzr://localhost:%d/' % port
96
def test_bzr_serve_inet_readonly(self):
97
"""bzr server should provide a read only filesystem by default."""
98
process, transport = self.start_server_inet()
99
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
100
self.assertInetServerShutsdownCleanly(process)
102
def test_bzr_serve_inet_readwrite(self):
104
self.make_branch('.')
106
process, transport = self.start_server_inet(['--allow-writes'])
108
# We get a working branch
109
branch = BzrDir.open_from_transport(transport).open_branch()
110
branch.repository.get_revision_graph()
111
self.assertEqual(_mod_revision.NULL_REVISION,
112
_mod_revision.ensure_null(branch.last_revision()))
113
self.assertInetServerShutsdownCleanly(process)
115
def test_bzr_serve_port_readonly(self):
116
"""bzr server should provide a read only filesystem by default."""
117
process, url = self.start_server_port()
118
transport = get_transport(url)
119
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
120
self.assertServerFinishesCleanly(process)
122
def test_bzr_serve_port_readwrite(self):
124
self.make_branch('.')
126
process, url = self.start_server_port(['--allow-writes'])
128
# Connect to the server
129
branch = Branch.open(url)
131
# We get a working branch
132
branch.repository.get_revision_graph()
133
self.assertEqual(_mod_revision.NULL_REVISION,
134
_mod_revision.ensure_null(branch.last_revision()))
136
self.assertServerFinishesCleanly(process)
138
def test_bzr_connect_to_bzr_ssh(self):
139
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
141
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
144
from bzrlib.transport.sftp import SFTPServer
145
except ParamikoNotPresent:
146
raise TestSkipped('Paramiko not installed')
147
from bzrlib.tests.stub_sftp import StubServer
150
self.make_branch('a_branch')
152
# Start an SSH server
153
self.command_executed = []
154
# XXX: This is horrible -- we define a really dumb SSH server that
155
# executes commands, and manage the hooking up of stdin/out/err to the
156
# SSH channel ourselves. Surely this has already been implemented
158
class StubSSHServer(StubServer):
162
def check_channel_exec_request(self, channel, command):
163
self.test.command_executed.append(command)
164
proc = subprocess.Popen(
165
command, shell=True, stdin=subprocess.PIPE,
166
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
168
# XXX: horribly inefficient, not to mention ugly.
169
# Start a thread for each of stdin/out/err, and relay bytes from
170
# the subprocess to channel and vice versa.
171
def ferry_bytes(read, write, close):
180
(channel.recv, proc.stdin.write, proc.stdin.close),
181
(proc.stdout.read, channel.sendall, channel.close),
182
(proc.stderr.read, channel.sendall_stderr, channel.close)]
183
for read, write, close in file_functions:
184
t = threading.Thread(
185
target=ferry_bytes, args=(read, write, close))
190
ssh_server = SFTPServer(StubSSHServer)
191
# XXX: We *don't* want to override the default SSH vendor, so we set
192
# _vendor to what _get_ssh_vendor returns.
194
self.addCleanup(ssh_server.tearDown)
195
port = ssh_server._listener.port
197
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
198
# variable is used to tell bzr what command to run on the remote end.
199
path_to_branch = osutils.abspath('a_branch')
201
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
202
bzr_remote_path = self.get_bzr_path()
203
if sys.platform == 'win32':
204
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
205
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
207
if sys.platform == 'win32':
208
path_to_branch = os.path.splitdrive(path_to_branch)[1]
209
branch = Branch.open(
210
'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch))
212
branch.repository.get_revision_graph()
213
self.assertEqual(_mod_revision.NULL_REVISION,
214
_mod_revision.ensure_null(branch.last_revision()))
215
# Check we can perform write operations
216
branch.bzrdir.root_transport.mkdir('foo')
218
# Restore the BZR_REMOTE_PATH environment variable back to its
220
if orig_bzr_remote_path is None:
221
del os.environ['BZR_REMOTE_PATH']
223
os.environ['BZR_REMOTE_PATH'] = orig_bzr_remote_path
226
['%s serve --inet --directory=/ --allow-writes'
228
self.command_executed)