1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Tests of the bzr serve command."""
25
from bzrlib import errors
26
from bzrlib.branch import Branch
27
from bzrlib.bzrdir import BzrDir
28
from bzrlib.errors import ParamikoNotPresent
29
from bzrlib.tests import TestCaseWithTransport, TestSkipped
30
from bzrlib.transport import get_transport, smart
33
class TestBzrServe(TestCaseWithTransport):
35
def assertInetServerShutsdownCleanly(self, process):
36
"""Shutdown the server process looking for errors."""
37
# Shutdown the server: the server should shut down when it cannot read
40
# Hide stdin from the subprocess module, so it won't fail to close it.
42
result = self.finish_bzr_subprocess(process, retcode=0)
43
self.assertEqual('', result[0])
44
self.assertEqual('', result[1])
46
def assertServerFinishesCleanly(self, process):
47
"""Shutdown the bzr serve instance process looking for errors."""
49
result = self.finish_bzr_subprocess(process, retcode=3,
50
send_signal=signal.SIGINT)
51
self.assertEqual('', result[0])
52
self.assertEqual('bzr: interrupted\n', result[1])
54
def start_server_inet(self, extra_options=()):
55
"""Start a bzr server subprocess using the --inet option.
57
:param extra_options: extra options to give the server.
58
:return: a tuple with the bzr process handle for passing to
59
finish_bzr_subprocess, a client for the server, and a transport.
61
# Serve from the current directory
62
process = self.start_bzr_subprocess(['serve', '--inet'])
64
# Connect to the server
65
# We use this url because while this is no valid URL to connect to this
66
# server instance, the transport needs a URL.
67
medium = smart.SmartSimplePipesClientMedium(
68
process.stdout, process.stdin)
69
transport = smart.SmartTransport('bzr://localhost/', medium=medium)
70
return process, transport
72
def start_server_port(self, extra_options=()):
73
"""Start a bzr server subprocess.
75
:param extra_options: extra options to give the server.
76
:return: a tuple with the bzr process handle for passing to
77
finish_bzr_subprocess, and the base url for the server.
79
# Serve from the current directory
80
args = ['serve', '--port', 'localhost:0']
81
args.extend(extra_options)
82
process = self.start_bzr_subprocess(args, skip_if_plan_to_signal=True)
83
port_line = process.stdout.readline()
84
prefix = 'listening on port: '
85
self.assertStartsWith(port_line, prefix)
86
port = int(port_line[len(prefix):])
87
return process,'bzr://localhost:%d/' % port
89
def test_bzr_serve_inet_readonly(self):
90
"""bzr server should provide a read only filesystem by default."""
91
process, transport = self.start_server_inet()
92
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
93
self.assertInetServerShutsdownCleanly(process)
95
def test_bzr_serve_inet_readwrite(self):
99
process, transport = self.start_server_inet(['--allow-writes'])
101
# We get a working branch
102
branch = BzrDir.open_from_transport(transport).open_branch()
103
branch.repository.get_revision_graph()
104
self.assertEqual(None, branch.last_revision())
105
self.assertInetServerShutsdownCleanly(process)
107
def test_bzr_serve_port_readonly(self):
108
"""bzr server should provide a read only filesystem by default."""
109
process, url = self.start_server_port()
110
transport = get_transport(url)
111
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
112
self.assertServerFinishesCleanly(process)
114
def test_bzr_serve_port_readwrite(self):
116
self.make_branch('.')
118
process, url = self.start_server_port(['--allow-writes'])
120
# Connect to the server
121
branch = Branch.open(url)
123
# We get a working branch
124
branch.repository.get_revision_graph()
125
self.assertEqual(None, branch.last_revision())
127
self.assertServerFinishesCleanly(process)
129
def test_bzr_connect_to_bzr_ssh(self):
130
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
132
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
135
from bzrlib.transport.sftp import SFTPServer
136
except ParamikoNotPresent:
137
raise TestSkipped('Paramiko not installed')
138
from bzrlib.tests.stub_sftp import StubServer
141
self.make_branch('a_branch')
143
# Start an SSH server
144
self.command_executed = []
145
# XXX: This is horrible -- we define a really dumb SSH server that
146
# executes commands, and manage the hooking up of stdin/out/err to the
147
# SSH channel ourselves. Surely this has already been implemented
149
class StubSSHServer(StubServer):
153
def check_channel_exec_request(self, channel, command):
154
self.test.command_executed.append(command)
155
proc = subprocess.Popen(
156
command, shell=True, stdin=subprocess.PIPE,
157
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
159
# XXX: horribly inefficient, not to mention ugly.
160
# Start a thread for each of stdin/out/err, and relay bytes from
161
# the subprocess to channel and vice versa.
162
def ferry_bytes(read, write, close):
171
(channel.recv, proc.stdin.write, proc.stdin.close),
172
(proc.stdout.read, channel.sendall, channel.close),
173
(proc.stderr.read, channel.sendall_stderr, channel.close)]
174
for read, write, close in file_functions:
175
t = threading.Thread(
176
target=ferry_bytes, args=(read, write, close))
181
ssh_server = SFTPServer(StubSSHServer)
182
# XXX: We *don't* want to override the default SSH vendor, so we set
183
# _vendor to what _get_ssh_vendor returns.
185
self.addCleanup(ssh_server.tearDown)
186
port = ssh_server._listener.port
188
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
189
# variable is used to tell bzr what command to run on the remote end.
190
path_to_branch = os.path.abspath('a_branch')
192
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
193
os.environ['BZR_REMOTE_PATH'] = self.get_bzr_path()
195
branch = Branch.open(
196
'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch))
198
branch.repository.get_revision_graph()
199
self.assertEqual(None, branch.last_revision())
200
# Check we can perform write operations
201
branch.bzrdir.root_transport.mkdir('foo')
203
# Restore the BZR_REMOTE_PATH environment variable back to its
205
if orig_bzr_remote_path is None:
206
del os.environ['BZR_REMOTE_PATH']
208
os.environ['BZR_REMOTE_PATH'] = orig_bzr_remote_path
211
['%s serve --inet --directory=/ --allow-writes'
212
% self.get_bzr_path()],
213
self.command_executed)