~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_cleanup.py

  • Committer: John Arbash Meinel
  • Date: 2010-02-17 17:11:16 UTC
  • mfrom: (4797.2.17 2.1)
  • mto: (4797.2.18 2.1)
  • mto: This revision was merged to the branch mainline in revision 5055.
  • Revision ID: john@arbash-meinel.com-20100217171116-h7t9223ystbnx5h8
merge bzr.2.1 in preparation for NEWS entry.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from cStringIO import StringIO
 
18
import re
 
19
 
 
20
from bzrlib.cleanup import (
 
21
    _do_with_cleanups,
 
22
    _run_cleanup,
 
23
    OperationWithCleanups,
 
24
    )
 
25
from bzrlib.tests import TestCase
 
26
from bzrlib import (
 
27
    debug,
 
28
    trace,
 
29
    )
 
30
 
 
31
 
 
32
class CleanupsTestCase(TestCase):
 
33
 
 
34
    def setUp(self):
 
35
        super(CleanupsTestCase, self).setUp()
 
36
        self.call_log = []
 
37
 
 
38
    def no_op_cleanup(self):
 
39
        self.call_log.append('no_op_cleanup')
 
40
 
 
41
    def assertLogContains(self, regex):
 
42
        self.assertContainsRe(self.get_log(), regex, re.DOTALL)
 
43
 
 
44
    def failing_cleanup(self):
 
45
        self.call_log.append('failing_cleanup')
 
46
        raise Exception("failing_cleanup goes boom!")
 
47
 
 
48
 
 
49
class TestRunCleanup(CleanupsTestCase):
 
50
 
 
51
    def test_no_errors(self):
 
52
        """The function passed to _run_cleanup is run."""
 
53
        self.assertTrue(_run_cleanup(self.no_op_cleanup))
 
54
        self.assertEqual(['no_op_cleanup'], self.call_log)
 
55
 
 
56
    def test_cleanup_with_args_kwargs(self):
 
57
        def func_taking_args_kwargs(*args, **kwargs):
 
58
            self.call_log.append(('func', args, kwargs))
 
59
        _run_cleanup(func_taking_args_kwargs, 'an arg', kwarg='foo')
 
60
        self.assertEqual(
 
61
            [('func', ('an arg',), {'kwarg': 'foo'})], self.call_log)
 
62
 
 
63
    def test_cleanup_error(self):
 
64
        """An error from the cleanup function is logged by _run_cleanup, but not
 
65
        propagated.
 
66
 
 
67
        This is there's no way for _run_cleanup to know if there's an existing
 
68
        exception in this situation::
 
69
            try:
 
70
              some_func()
 
71
            finally:
 
72
              _run_cleanup(cleanup_func)
 
73
        So, the best _run_cleanup can do is always log errors but never raise
 
74
        them.
 
75
        """
 
76
        self.assertFalse(_run_cleanup(self.failing_cleanup))
 
77
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
 
78
 
 
79
    def test_cleanup_error_debug_flag(self):
 
80
        """The -Dcleanup debug flag causes cleanup errors to be reported to the
 
81
        user.
 
82
        """
 
83
        log = StringIO()
 
84
        trace.push_log_file(log)
 
85
        debug.debug_flags.add('cleanup')
 
86
        self.assertFalse(_run_cleanup(self.failing_cleanup))
 
87
        self.assertContainsRe(
 
88
            log.getvalue(),
 
89
            "bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
 
90
 
 
91
    def test_prior_error_cleanup_succeeds(self):
 
92
        """Calling _run_cleanup from a finally block will not interfere with an
 
93
        exception from the try block.
 
94
        """
 
95
        def failing_operation():
 
96
            try:
 
97
                1/0
 
98
            finally:
 
99
                _run_cleanup(self.no_op_cleanup)
 
100
        self.assertRaises(ZeroDivisionError, failing_operation)
 
101
        self.assertEqual(['no_op_cleanup'], self.call_log)
 
102
 
 
103
    def test_prior_error_cleanup_fails(self):
 
104
        """Calling _run_cleanup from a finally block will not interfere with an
 
105
        exception from the try block even when the cleanup itself raises an
 
106
        exception.
 
107
 
 
108
        The cleanup exception will be logged.
 
109
        """
 
110
        def failing_operation():
 
111
            try:
 
112
                1/0
 
113
            finally:
 
114
                _run_cleanup(self.failing_cleanup)
 
115
        self.assertRaises(ZeroDivisionError, failing_operation)
 
116
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
 
117
 
 
118
 
 
119
class TestDoWithCleanups(CleanupsTestCase):
 
120
 
 
121
    def trivial_func(self):
 
122
        self.call_log.append('trivial_func')
 
123
        return 'trivial result'
 
124
 
 
125
    def test_runs_func(self):
 
126
        """_do_with_cleanups runs the function it is given, and returns the
 
127
        result.
 
128
        """
 
129
        result = _do_with_cleanups([], self.trivial_func)
 
130
        self.assertEqual('trivial result', result)
 
131
 
 
132
    def test_runs_cleanups(self):
 
133
        """Cleanup functions are run (in the given order)."""
 
134
        cleanup_func_1 = (self.call_log.append, ('cleanup 1',), {})
 
135
        cleanup_func_2 = (self.call_log.append, ('cleanup 2',), {})
 
136
        _do_with_cleanups([cleanup_func_1, cleanup_func_2], self.trivial_func)
 
137
        self.assertEqual(
 
138
            ['trivial_func', 'cleanup 1', 'cleanup 2'], self.call_log)
 
139
 
 
140
    def failing_func(self):
 
141
        self.call_log.append('failing_func')
 
142
        1/0
 
143
 
 
144
    def test_func_error_propagates(self):
 
145
        """Errors from the main function are propagated (after running
 
146
        cleanups).
 
147
        """
 
148
        self.assertRaises(
 
149
            ZeroDivisionError, _do_with_cleanups,
 
150
            [(self.no_op_cleanup, (), {})], self.failing_func)
 
151
        self.assertEqual(['failing_func', 'no_op_cleanup'], self.call_log)
 
152
 
 
153
    def test_func_error_trumps_cleanup_error(self):
 
154
        """Errors from the main function a propagated even if a cleanup raises
 
155
        an error.
 
156
 
 
157
        The cleanup error is be logged.
 
158
        """
 
159
        self.assertRaises(
 
160
            ZeroDivisionError, _do_with_cleanups,
 
161
            [(self.failing_cleanup, (), {})], self.failing_func)
 
162
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
 
163
 
 
164
    def test_func_passes_and_error_from_cleanup(self):
 
165
        """An error from a cleanup is propagated when the main function doesn't
 
166
        raise an error.  Later cleanups are still executed.
 
167
        """
 
168
        exc = self.assertRaises(
 
169
            Exception, _do_with_cleanups,
 
170
            [(self.failing_cleanup, (), {}), (self.no_op_cleanup, (), {})],
 
171
            self.trivial_func)
 
172
        self.assertEqual('failing_cleanup goes boom!', exc.args[0])
 
173
        self.assertEqual(
 
174
            ['trivial_func', 'failing_cleanup', 'no_op_cleanup'],
 
175
            self.call_log)
 
176
 
 
177
    def test_multiple_cleanup_failures(self):
 
178
        """When multiple cleanups fail (as tends to happen when something has
 
179
        gone wrong), the first error is propagated, and subsequent errors are
 
180
        logged.
 
181
        """
 
182
        cleanups = self.make_two_failing_cleanup_funcs()
 
183
        self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
 
184
            self.trivial_func)
 
185
        self.assertLogContains('Cleanup failed:.*ErrorB')
 
186
        self.assertFalse('ErrorA' in self.get_log())
 
187
 
 
188
    def make_two_failing_cleanup_funcs(self):
 
189
        def raise_a():
 
190
            raise ErrorA('Error A')
 
191
        def raise_b():
 
192
            raise ErrorB('Error B')
 
193
        return [(raise_a, (), {}), (raise_b, (), {})]
 
194
 
 
195
    def test_multiple_cleanup_failures_debug_flag(self):
 
196
        log = StringIO()
 
197
        trace.push_log_file(log)
 
198
        debug.debug_flags.add('cleanup')
 
199
        cleanups = self.make_two_failing_cleanup_funcs()
 
200
        self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
 
201
            self.trivial_func)
 
202
        self.assertContainsRe(
 
203
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
 
204
        self.assertEqual(1, log.getvalue().count('bzr: warning:'),
 
205
                log.getvalue())
 
206
 
 
207
    def test_func_and_cleanup_errors_debug_flag(self):
 
208
        log = StringIO()
 
209
        trace.push_log_file(log)
 
210
        debug.debug_flags.add('cleanup')
 
211
        cleanups = self.make_two_failing_cleanup_funcs()
 
212
        self.assertRaises(ZeroDivisionError, _do_with_cleanups, cleanups,
 
213
            self.failing_func)
 
214
        self.assertContainsRe(
 
215
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error A\n")
 
216
        self.assertContainsRe(
 
217
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
 
218
        self.assertEqual(2, log.getvalue().count('bzr: warning:'))
 
219
 
 
220
    def test_func_may_mutate_cleanups(self):
 
221
        """The main func may mutate the cleanups before it returns.
 
222
        
 
223
        This allows a function to gradually add cleanups as it acquires
 
224
        resources, rather than planning all the cleanups up-front.  The
 
225
        OperationWithCleanups helper relies on this working.
 
226
        """
 
227
        cleanups_list = []
 
228
        def func_that_adds_cleanups():
 
229
            self.call_log.append('func_that_adds_cleanups')
 
230
            cleanups_list.append((self.no_op_cleanup, (), {}))
 
231
            return 'result'
 
232
        result = _do_with_cleanups(cleanups_list, func_that_adds_cleanups)
 
233
        self.assertEqual('result', result)
 
234
        self.assertEqual(
 
235
            ['func_that_adds_cleanups', 'no_op_cleanup'], self.call_log)
 
236
 
 
237
    def test_cleanup_error_debug_flag(self):
 
238
        """The -Dcleanup debug flag causes cleanup errors to be reported to the
 
239
        user.
 
240
        """
 
241
        log = StringIO()
 
242
        trace.push_log_file(log)
 
243
        debug.debug_flags.add('cleanup')
 
244
        self.assertRaises(ZeroDivisionError, _do_with_cleanups,
 
245
            [(self.failing_cleanup, (), {})], self.failing_func)
 
246
        self.assertContainsRe(
 
247
            log.getvalue(),
 
248
            "bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
 
249
        self.assertEqual(1, log.getvalue().count('bzr: warning:'))
 
250
 
 
251
 
 
252
class ErrorA(Exception): pass
 
253
class ErrorB(Exception): pass
 
254
 
 
255
 
 
256
class TestOperationWithCleanups(CleanupsTestCase):
 
257
 
 
258
    def test_cleanup_ordering(self):
 
259
        """Cleanups are added in LIFO order.
 
260
 
 
261
        So cleanups added before run is called are run last, and the last
 
262
        cleanup added during the func is run first.
 
263
        """
 
264
        call_log = []
 
265
        def func(op, foo):
 
266
            call_log.append(('func called', foo))
 
267
            op.add_cleanup(call_log.append, 'cleanup 2')
 
268
            op.add_cleanup(call_log.append, 'cleanup 1')
 
269
            return 'result'
 
270
        owc = OperationWithCleanups(func)
 
271
        owc.add_cleanup(call_log.append, 'cleanup 4')
 
272
        owc.add_cleanup(call_log.append, 'cleanup 3')
 
273
        result = owc.run('foo')
 
274
        self.assertEqual('result', result)
 
275
        self.assertEqual(
 
276
            [('func called', 'foo'), 'cleanup 1', 'cleanup 2', 'cleanup 3',
 
277
            'cleanup 4'], call_log)
 
278