68
69
OVERRIDE_PYTHON = None # to run with alternative python 'python'
72
def apply_redirected(self, stdin=None, stdout=None, stderr=None,
73
a_callable=None, *args, **kwargs):
74
"""Call callable with redirected std io pipes.
76
Returns the return code."""
77
from StringIO import StringIO
78
if not callable(a_callable):
79
raise ValueError("a_callable must be callable.")
83
stdout = self.TEST_LOG
85
stderr = self.TEST_LOG
86
real_stdin = sys.stdin
87
real_stdout = sys.stdout
88
real_stderr = sys.stderr
94
result = a_callable(*args, **kwargs)
96
sys.stdout = real_stdout
97
sys.stderr = real_stderr
98
sys.stdin = real_stdin
75
102
super(TestCase, self).setUp()
76
# save stdout & stderr so there's no leakage from code-under-test
77
self.real_stdout = sys.stdout
78
self.real_stderr = sys.stderr
79
sys.stdout = sys.stderr = TestCase.TEST_LOG
103
# setup a temporary log for the test
105
self.TEST_LOG = tempfile.NamedTemporaryFile(mode='wt', bufsize=0)
80
106
self.log("%s setup" % self.id())
82
108
def tearDown(self):
83
sys.stdout = self.real_stdout
84
sys.stderr = self.real_stderr
85
109
self.log("%s teardown" % self.id())
87
111
super(TestCase, self).tearDown()
114
"""Log a message to a progress file"""
115
print >>self.TEST_LOG, msg
117
def check_inventory_shape(self, inv, shape):
119
Compare an inventory to a list of expected names.
121
Fail if they are not precisely equal.
124
shape = list(shape) # copy
125
for path, ie in inv.entries():
126
name = path.replace('\\', '/')
134
self.fail("expected paths not found in inventory: %r" % shape)
136
self.fail("unexpected paths found in inventory: %r" % extras)
139
"""Get the log the test case used. This can only be called once,
140
after which an exception will be raised.
142
self.TEST_LOG.flush()
143
log = open(self.TEST_LOG.name, 'rt').read()
144
self.TEST_LOG.close()
148
class FunctionalTestCase(TestCase):
149
"""Base class for tests that perform function testing - running bzr,
150
using files on disk, and similar activities.
152
InTempDir is an old alias for FunctionalTestCase.
158
def check_file_contents(self, filename, expect):
159
self.log("check contents of file %s" % filename)
160
contents = file(filename, 'r').read()
161
if contents != expect:
162
self.log("expected: %r" % expect)
163
self.log("actually: %r" % contents)
164
self.fail("contents of %s not as expected")
166
def _make_test_root(self):
171
if FunctionalTestCase.TEST_ROOT is not None:
173
FunctionalTestCase.TEST_ROOT = os.path.abspath(
174
tempfile.mkdtemp(suffix='.tmp',
175
prefix=self._TEST_NAME + '-',
178
# make a fake bzr directory there to prevent any tests propagating
179
# up onto the source directory's real branch
180
os.mkdir(os.path.join(FunctionalTestCase.TEST_ROOT, '.bzr'))
183
super(FunctionalTestCase, self).setUp()
185
self._make_test_root()
186
self._currentdir = os.getcwdu()
187
self.test_dir = os.path.join(self.TEST_ROOT, self.id())
188
os.mkdir(self.test_dir)
189
os.chdir(self.test_dir)
193
os.chdir(self._currentdir)
194
super(FunctionalTestCase, self).tearDown()
89
196
def formcmd(self, cmd):
90
197
if isinstance(cmd, basestring):
93
199
if cmd[0] == 'bzr':
94
200
cmd[0] = self.BZRPATH
95
201
if self.OVERRIDE_PYTHON:
96
202
cmd.insert(0, self.OVERRIDE_PYTHON)
98
203
self.log('$ %r' % cmd)
103
206
def runcmd(self, cmd, retcode=0):
104
207
"""Run one command and check the return code.
134
232
except ImportError, e:
135
233
_need_subprocess()
138
235
cmd = self.formcmd(cmd)
139
236
child = Popen(cmd, stdout=PIPE, stderr=self.TEST_LOG)
140
237
outd, errd = child.communicate()
142
239
actual_retcode = child.wait()
144
240
outd = outd.replace('\r', '')
146
241
if retcode != actual_retcode:
147
242
raise CommandFailed("test failed: %r returned %d, expected %d"
148
243
% (cmd, actual_retcode, retcode))
154
246
def build_tree(self, shape):
155
247
"""Build a test tree according to a pattern.
169
261
f = file(name, 'wt')
170
262
print >>f, "contents of", name
175
"""Log a message to a progress file"""
176
# XXX: The problem with this is that code that writes straight
177
# to the log file won't be shown when we display the log
178
# buffer; would be better to not have the in-memory buffer and
179
# instead just a log file per test, which is read in and
180
# displayed if the test fails. That seems to imply one log
181
# per test case, not globally. OK?
182
self._log_buf = self._log_buf + str(msg) + '\n'
183
print >>self.TEST_LOG, msg
186
def check_inventory_shape(self, inv, shape):
188
Compare an inventory to a list of expected names.
190
Fail if they are not precisely equal.
193
shape = list(shape) # copy
194
for path, ie in inv.entries():
195
name = path.replace('\\', '/')
203
self.fail("expected paths not found in inventory: %r" % shape)
205
self.fail("unexpected paths found in inventory: %r" % extras)
208
def check_file_contents(self, filename, expect):
209
self.log("check contents of file %s" % filename)
210
contents = file(filename, 'r').read()
211
if contents != expect:
212
self.log("expected: %r" % expect)
213
self.log("actually: %r" % contents)
214
self.fail("contents of %s not as expected")
218
class InTempDir(TestCase):
219
"""Base class for tests run in a temporary branch."""
221
super(InTempDir, self).setUp()
223
self.test_dir = os.path.join(self.TEST_ROOT, self.__class__.__name__)
224
os.mkdir(self.test_dir)
225
os.chdir(self.test_dir)
229
os.chdir(self.TEST_ROOT)
230
super(InTempDir, self).tearDown()
265
InTempDir = FunctionalTestCase
268
class EarlyStoppingTestResultAdapter(object):
269
"""An adapter for TestResult to stop at the first first failure or error"""
271
def __init__(self, result):
272
self._result = result
274
def addError(self, test, err):
275
self._result.addError(test, err)
278
def addFailure(self, test, err):
279
self._result.addFailure(test, err)
282
def __getattr__(self, name):
283
return getattr(self._result, name)
285
def __setattr__(self, name, value):
286
if name == '_result':
287
object.__setattr__(self, name, value)
288
return setattr(self._result, name, value)
233
291
class _MyResult(unittest._TextTestResult):
237
295
No special behaviour for now.
239
def __init__(self, out, style):
240
super(_MyResult, self).__init__(out, False, 0)
242
assert style in ('none', 'progress', 'verbose')
245
298
def startTest(self, test):
246
super(_MyResult, self).startTest(test)
299
unittest.TestResult.startTest(self, test)
247
300
# TODO: Maybe show test.shortDescription somewhere?
249
302
# python2.3 has the bad habit of just "runit" for doctests
250
303
if what == 'runit':
251
304
what = test.shortDescription()
252
if self.style == 'verbose':
253
print >>self.out, '%-60.60s' % what,
306
self.stream.write('%-60.60s' % what)
256
309
def addError(self, test, err):
257
if self.style == 'verbose':
258
print >>self.out, 'ERROR'
259
elif self.style == 'progress':
260
self.stream.write('E')
262
310
super(_MyResult, self).addError(test, err)
264
313
def addFailure(self, test, err):
265
if self.style == 'verbose':
266
print >>self.out, 'FAILURE'
267
elif self.style == 'progress':
268
self.stream.write('F')
270
314
super(_MyResult, self).addFailure(test, err)
272
317
def addSuccess(self, test):
273
if self.style == 'verbose':
274
print >>self.out, 'OK'
275
elif self.style == 'progress':
319
self.stream.writeln('OK')
276
321
self.stream.write('~')
277
322
self.stream.flush()
278
super(_MyResult, self).addSuccess(test)
280
def printErrors(self):
281
if self.style == 'progress':
282
self.stream.writeln()
283
super(_MyResult, self).printErrors()
323
unittest.TestResult.addSuccess(self, test)
285
325
def printErrorList(self, flavour, errors):
286
326
for test, err in errors:
291
331
if isinstance(test, TestCase):
292
332
self.stream.writeln()
293
333
self.stream.writeln('log from this test:')
294
print >>self.stream, test._log_buf
297
class TestSuite(unittest.TestSuite):
299
def __init__(self, tests=(), name='test'):
300
super(TestSuite, self).__init__(tests)
303
def run(self, result):
308
self._setup_test_log()
309
self._setup_test_dir()
312
return super(TestSuite,self).run(result)
314
def _setup_test_log(self):
318
log_filename = os.path.abspath(self._name + '.log')
320
TestCase.TEST_LOG = open(log_filename, 'wt', buffering=1)
322
print >>TestCase.TEST_LOG, "tests run at " + time.ctime()
323
print '%-30s %s' % ('test log', log_filename)
325
def _setup_test_dir(self):
329
TestCase.ORIG_DIR = os.getcwdu()
330
TestCase.TEST_ROOT = os.path.abspath(self._name + '.tmp')
332
print '%-30s %s' % ('running tests in', TestCase.TEST_ROOT)
334
if os.path.exists(TestCase.TEST_ROOT):
335
shutil.rmtree(TestCase.TEST_ROOT)
336
os.mkdir(TestCase.TEST_ROOT)
337
os.chdir(TestCase.TEST_ROOT)
339
# make a fake bzr directory there to prevent any tests propagating
340
# up onto the source directory's real branch
341
os.mkdir(os.path.join(TestCase.TEST_ROOT, '.bzr'))
334
print >>self.stream, test._get_log()
344
337
class TextTestRunner(unittest.TextTestRunner):
346
def __init__(self, stream=sys.stderr, descriptions=1, verbosity=0, style='progress'):
347
super(TextTestRunner, self).__init__(stream, descriptions, verbosity)
350
339
def _makeResult(self):
351
return _MyResult(self.stream, self.style)
353
# If we want the old 4 line summary output (count, 0 failures, 0 errors)
354
# we can override run() too.
357
def run_suite(a_suite, name='test', verbose=False):
358
suite = TestSuite((a_suite,),name)
340
result = _MyResult(self.stream, self.descriptions, self.verbosity)
341
return EarlyStoppingTestResultAdapter(result)
344
class filteringVisitor(TestUtil.TestVisitor):
345
"""I accruse all the testCases I visit that pass a regexp filter on id
349
def __init__(self, filter):
351
TestUtil.TestVisitor.__init__(self)
353
self.filter=re.compile(filter)
356
"""answer the suite we are building"""
357
if self._suite is None:
358
self._suite=TestUtil.TestSuite()
361
def visitCase(self, aCase):
362
if self.filter.match(aCase.id()):
363
self.suite().addTest(aCase)
366
def run_suite(suite, name='test', verbose=False, pattern=".*"):
368
FunctionalTestCase._TEST_NAME = name
363
runner = TextTestRunner(stream=sys.stdout, style=style)
364
result = runner.run(suite)
373
runner = TextTestRunner(stream=sys.stdout,
376
visitor = filteringVisitor(pattern)
378
result = runner.run(visitor.suite())
379
# This is still a little bogus,
380
# but only a little. Folk not using our testrunner will
381
# have to delete their temp directories themselves.
382
if result.wasSuccessful():
383
if FunctionalTestCase.TEST_ROOT is not None:
384
shutil.rmtree(FunctionalTestCase.TEST_ROOT)
386
print "Failed tests working directories are in '%s'\n" % FunctionalTestCase.TEST_ROOT
365
387
return result.wasSuccessful()