1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Tests of the bzr serve command."""
29
revision as _mod_revision,
31
from bzrlib.branch import Branch
32
from bzrlib.bzrdir import BzrDir
33
from bzrlib.errors import ParamikoNotPresent
34
from bzrlib.smart import medium
35
from bzrlib.tests import TestCaseWithTransport, TestSkipped
36
from bzrlib.transport import get_transport, remote
39
class TestBzrServe(TestCaseWithTransport):
41
def assertInetServerShutsdownCleanly(self, process):
42
"""Shutdown the server process looking for errors."""
43
# Shutdown the server: the server should shut down when it cannot read
46
# Hide stdin from the subprocess module, so it won't fail to close it.
48
result = self.finish_bzr_subprocess(process)
49
self.assertEqual('', result[0])
50
self.assertEqual('', result[1])
52
def assertServerFinishesCleanly(self, process):
53
"""Shutdown the bzr serve instance process looking for errors."""
55
result = self.finish_bzr_subprocess(process, retcode=3,
56
send_signal=signal.SIGINT)
57
self.assertEqual('', result[0])
58
self.assertEqual('bzr: interrupted\n', result[1])
60
def make_read_requests(self, branch):
61
"""Do some read only requests."""
64
branch.repository.all_revision_ids()
65
self.assertEqual(_mod_revision.NULL_REVISION,
66
_mod_revision.ensure_null(branch.last_revision()))
70
def start_server_inet(self, extra_options=()):
71
"""Start a bzr server subprocess using the --inet option.
73
:param extra_options: extra options to give the server.
74
:return: a tuple with the bzr process handle for passing to
75
finish_bzr_subprocess, a client for the server, and a transport.
77
# Serve from the current directory
78
process = self.start_bzr_subprocess(['serve', '--inet'])
80
# Connect to the server
81
# We use this url because while this is no valid URL to connect to this
82
# server instance, the transport needs a URL.
83
client_medium = medium.SmartSimplePipesClientMedium(
84
process.stdout, process.stdin)
85
transport = remote.RemoteTransport(
86
'bzr://localhost/', medium=client_medium)
87
return process, transport
89
def start_server_port(self, extra_options=()):
90
"""Start a bzr server subprocess.
92
:param extra_options: extra options to give the server.
93
:return: a tuple with the bzr process handle for passing to
94
finish_bzr_subprocess, and the base url for the server.
96
# Serve from the current directory
97
args = ['serve', '--port', 'localhost:0']
98
args.extend(extra_options)
99
process = self.start_bzr_subprocess(args, skip_if_plan_to_signal=True)
100
port_line = process.stdout.readline()
101
prefix = 'listening on port: '
102
self.assertStartsWith(port_line, prefix)
103
port = int(port_line[len(prefix):])
104
return process,'bzr://localhost:%d/' % port
106
def test_bzr_serve_inet_readonly(self):
107
"""bzr server should provide a read only filesystem by default."""
108
process, transport = self.start_server_inet()
109
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
110
self.assertInetServerShutsdownCleanly(process)
112
def test_bzr_serve_inet_readwrite(self):
114
self.make_branch('.')
116
process, transport = self.start_server_inet(['--allow-writes'])
118
# We get a working branch
119
branch = BzrDir.open_from_transport(transport).open_branch()
120
self.make_read_requests(branch)
121
self.assertInetServerShutsdownCleanly(process)
123
def test_bzr_serve_port_readonly(self):
124
"""bzr server should provide a read only filesystem by default."""
125
process, url = self.start_server_port()
126
transport = get_transport(url)
127
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
128
self.assertServerFinishesCleanly(process)
130
def test_bzr_serve_port_readwrite(self):
132
self.make_branch('.')
134
process, url = self.start_server_port(['--allow-writes'])
136
# Connect to the server
137
branch = Branch.open(url)
138
self.make_read_requests(branch)
139
self.assertServerFinishesCleanly(process)
141
def test_bzr_connect_to_bzr_ssh(self):
142
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
144
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
147
from bzrlib.transport.sftp import SFTPServer
148
except ParamikoNotPresent:
149
raise TestSkipped('Paramiko not installed')
150
from bzrlib.tests.stub_sftp import StubServer
153
self.make_branch('a_branch')
155
# Start an SSH server
156
self.command_executed = []
157
# XXX: This is horrible -- we define a really dumb SSH server that
158
# executes commands, and manage the hooking up of stdin/out/err to the
159
# SSH channel ourselves. Surely this has already been implemented
161
class StubSSHServer(StubServer):
165
def check_channel_exec_request(self, channel, command):
166
self.test.command_executed.append(command)
167
proc = subprocess.Popen(
168
command, shell=True, stdin=subprocess.PIPE,
169
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
171
# XXX: horribly inefficient, not to mention ugly.
172
# Start a thread for each of stdin/out/err, and relay bytes from
173
# the subprocess to channel and vice versa.
174
def ferry_bytes(read, write, close):
183
(channel.recv, proc.stdin.write, proc.stdin.close),
184
(proc.stdout.read, channel.sendall, channel.close),
185
(proc.stderr.read, channel.sendall_stderr, channel.close)]
186
for read, write, close in file_functions:
187
t = threading.Thread(
188
target=ferry_bytes, args=(read, write, close))
193
ssh_server = SFTPServer(StubSSHServer)
194
# XXX: We *don't* want to override the default SSH vendor, so we set
195
# _vendor to what _get_ssh_vendor returns.
197
self.addCleanup(ssh_server.tearDown)
198
port = ssh_server._listener.port
200
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
201
# variable is used to tell bzr what command to run on the remote end.
202
path_to_branch = osutils.abspath('a_branch')
204
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
205
bzr_remote_path = self.get_bzr_path()
206
if sys.platform == 'win32':
207
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
208
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
210
if sys.platform == 'win32':
211
path_to_branch = os.path.splitdrive(path_to_branch)[1]
212
branch = Branch.open(
213
'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch))
214
self.make_read_requests(branch)
215
# Check we can perform write operations
216
branch.bzrdir.root_transport.mkdir('foo')
218
# Restore the BZR_REMOTE_PATH environment variable back to its
220
if orig_bzr_remote_path is None:
221
del os.environ['BZR_REMOTE_PATH']
223
os.environ['BZR_REMOTE_PATH'] = orig_bzr_remote_path
226
['%s serve --inet --directory=/ --allow-writes'
228
self.command_executed)