1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# Copyright (C) 2007 Canonical Ltd
# Authors: Robert Collins <robert.collins@canonical.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""Support for running strace against the current process."""
import errno
import os
import signal
import subprocess
import tempfile
from bzrlib import errors
# this is currently test-focused, so importing bzrlib.tests is ok. We might
# want to move feature to its own module though.
from bzrlib.tests import Feature
def strace(function, *args, **kwargs):
"""Invoke strace on function.
:return: a tuple: function-result, a StraceResult.
"""
return strace_detailed(function, args, kwargs)
def strace_detailed(function, args, kwargs, follow_children=True):
# FIXME: strace is buggy
# (https://bugs.launchpad.net/ubuntu/+source/strace/+bug/103133) and the
# test suite hangs if the '-f' is given to strace *and* more than one
# thread is running. Using follow_children=False allows the test suite to
# disable fork following to work around the bug.
# capture strace output to a file
log_file = tempfile.NamedTemporaryFile()
log_file_fd = log_file.fileno()
err_file = tempfile.NamedTemporaryFile()
pid = os.getpid()
# start strace
strace_cmd = ['strace', '-r', '-tt', '-p', str(pid), '-o', log_file.name]
if follow_children:
strace_args.append('-f')
# need to catch both stdout and stderr to work around
# bug 627208
proc = subprocess.Popen(strace_cmd,
stdout=subprocess.PIPE,
stderr=err_file.fileno())
# Wait for strace to attach
attached_notice = proc.stdout.readline()
# Run the function to strace
result = function(*args, **kwargs)
# stop strace
os.kill(proc.pid, signal.SIGQUIT)
proc.communicate()
# grab the log
log_file.seek(0)
log = log_file.read()
log_file.close()
# and stderr
err_file.seek(0)
err_messages = err_file.read()
err_file.close()
# and read any errors
if err_messages.startswith("attach: ptrace(PTRACE_ATTACH,"):
raise StraceError(err_messages=err_messages)
return result, StraceResult(log, err_messages)
class StraceError(errors.BzrError):
_fmt = "strace failed: %(err_messages)s"
class StraceResult(object):
"""The result of stracing a function."""
def __init__(self, raw_log, err_messages):
"""Create a StraceResult.
:param raw_log: The output that strace created.
"""
self.raw_log = raw_log
self.err_messages = err_messages
class _StraceFeature(Feature):
def _probe(self):
try:
proc = subprocess.Popen(['strace'],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
proc.communicate()
return True
except OSError, e:
if e.errno == errno.ENOENT:
# strace is not installed
return False
else:
raise
def feature_name(self):
return 'strace'
StraceFeature = _StraceFeature()
|