1
# Copyright (C) 2006 Canonical Ltd
3
# This program is free software; you can redistribute it and/or modify
4
# it under the terms of the GNU General Public License as published by
5
# the Free Software Foundation; either version 2 of the License, or
6
# (at your option) any later version.
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
# GNU General Public License for more details.
13
# You should have received a copy of the GNU General Public License
14
# along with this program; if not, write to the Free Software
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18
"""Tests of the bzr serve command."""
30
from bzrlib.branch import Branch
31
from bzrlib.bzrdir import BzrDir
32
from bzrlib.errors import ParamikoNotPresent
33
from bzrlib.tests import TestCaseWithTransport, TestSkipped
34
from bzrlib.transport import get_transport, smart
37
class TestBzrServe(TestCaseWithTransport):
39
def assertInetServerShutsdownCleanly(self, process):
40
"""Shutdown the server process looking for errors."""
41
# Shutdown the server: the server should shut down when it cannot read
44
# Hide stdin from the subprocess module, so it won't fail to close it.
46
result = self.finish_bzr_subprocess(process, retcode=0)
47
self.assertEqual('', result[0])
48
self.assertEqual('', result[1])
50
def assertServerFinishesCleanly(self, process):
51
"""Shutdown the bzr serve instance process looking for errors."""
53
result = self.finish_bzr_subprocess(process, retcode=3,
54
send_signal=signal.SIGINT)
55
self.assertEqual('', result[0])
56
self.assertEqual('bzr: interrupted\n', result[1])
58
def start_server_inet(self, extra_options=()):
59
"""Start a bzr server subprocess using the --inet option.
61
:param extra_options: extra options to give the server.
62
:return: a tuple with the bzr process handle for passing to
63
finish_bzr_subprocess, a client for the server, and a transport.
65
# Serve from the current directory
66
process = self.start_bzr_subprocess(['serve', '--inet'])
68
# Connect to the server
69
# We use this url because while this is no valid URL to connect to this
70
# server instance, the transport needs a URL.
71
medium = smart.SmartSimplePipesClientMedium(
72
process.stdout, process.stdin)
73
transport = smart.SmartTransport('bzr://localhost/', medium=medium)
74
return process, transport
76
def start_server_port(self, extra_options=()):
77
"""Start a bzr server subprocess.
79
:param extra_options: extra options to give the server.
80
:return: a tuple with the bzr process handle for passing to
81
finish_bzr_subprocess, and the base url for the server.
83
# Serve from the current directory
84
args = ['serve', '--port', 'localhost:0']
85
args.extend(extra_options)
86
process = self.start_bzr_subprocess(args, skip_if_plan_to_signal=True)
87
port_line = process.stdout.readline()
88
prefix = 'listening on port: '
89
self.assertStartsWith(port_line, prefix)
90
port = int(port_line[len(prefix):])
91
return process,'bzr://localhost:%d/' % port
93
def test_bzr_serve_inet_readonly(self):
94
"""bzr server should provide a read only filesystem by default."""
95
process, transport = self.start_server_inet()
96
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
97
self.assertInetServerShutsdownCleanly(process)
99
def test_bzr_serve_inet_readwrite(self):
101
self.make_branch('.')
103
process, transport = self.start_server_inet(['--allow-writes'])
105
# We get a working branch
106
branch = BzrDir.open_from_transport(transport).open_branch()
107
branch.repository.get_revision_graph()
108
self.assertEqual(None, branch.last_revision())
109
self.assertInetServerShutsdownCleanly(process)
111
def test_bzr_serve_port_readonly(self):
112
"""bzr server should provide a read only filesystem by default."""
113
process, url = self.start_server_port()
114
transport = get_transport(url)
115
self.assertRaises(errors.TransportNotPossible, transport.mkdir, 'adir')
116
self.assertServerFinishesCleanly(process)
118
def test_bzr_serve_port_readwrite(self):
120
self.make_branch('.')
122
process, url = self.start_server_port(['--allow-writes'])
124
# Connect to the server
125
branch = Branch.open(url)
127
# We get a working branch
128
branch.repository.get_revision_graph()
129
self.assertEqual(None, branch.last_revision())
131
self.assertServerFinishesCleanly(process)
133
def test_bzr_connect_to_bzr_ssh(self):
134
"""User acceptance that get_transport of a bzr+ssh:// behaves correctly.
136
bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
139
from bzrlib.transport.sftp import SFTPServer
140
except ParamikoNotPresent:
141
raise TestSkipped('Paramiko not installed')
142
from bzrlib.tests.stub_sftp import StubServer
145
self.make_branch('a_branch')
147
# Start an SSH server
148
self.command_executed = []
149
# XXX: This is horrible -- we define a really dumb SSH server that
150
# executes commands, and manage the hooking up of stdin/out/err to the
151
# SSH channel ourselves. Surely this has already been implemented
153
class StubSSHServer(StubServer):
157
def check_channel_exec_request(self, channel, command):
158
self.test.command_executed.append(command)
159
proc = subprocess.Popen(
160
command, shell=True, stdin=subprocess.PIPE,
161
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
163
# XXX: horribly inefficient, not to mention ugly.
164
# Start a thread for each of stdin/out/err, and relay bytes from
165
# the subprocess to channel and vice versa.
166
def ferry_bytes(read, write, close):
175
(channel.recv, proc.stdin.write, proc.stdin.close),
176
(proc.stdout.read, channel.sendall, channel.close),
177
(proc.stderr.read, channel.sendall_stderr, channel.close)]
178
for read, write, close in file_functions:
179
t = threading.Thread(
180
target=ferry_bytes, args=(read, write, close))
185
ssh_server = SFTPServer(StubSSHServer)
186
# XXX: We *don't* want to override the default SSH vendor, so we set
187
# _vendor to what _get_ssh_vendor returns.
189
self.addCleanup(ssh_server.tearDown)
190
port = ssh_server._listener.port
192
# Access the branch via a bzr+ssh URL. The BZR_REMOTE_PATH environment
193
# variable is used to tell bzr what command to run on the remote end.
194
path_to_branch = osutils.abspath('a_branch')
196
orig_bzr_remote_path = os.environ.get('BZR_REMOTE_PATH')
197
bzr_remote_path = self.get_bzr_path()
198
if sys.platform == 'win32':
199
bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
200
os.environ['BZR_REMOTE_PATH'] = bzr_remote_path
202
if sys.platform == 'win32':
203
path_to_branch = os.path.splitdrive(path_to_branch)[1]
204
branch = Branch.open(
205
'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch))
207
branch.repository.get_revision_graph()
208
self.assertEqual(None, branch.last_revision())
209
# Check we can perform write operations
210
branch.bzrdir.root_transport.mkdir('foo')
212
# Restore the BZR_REMOTE_PATH environment variable back to its
214
if orig_bzr_remote_path is None:
215
del os.environ['BZR_REMOTE_PATH']
217
os.environ['BZR_REMOTE_PATH'] = orig_bzr_remote_path
220
['%s serve --inet --directory=/ --allow-writes'
222
self.command_executed)