~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_cleanup.py

  • Committer: Andrew Bennetts
  • Date: 2010-09-13 06:36:59 UTC
  • mfrom: (5050.17.16 2.2)
  • mto: This revision was merged to the branch mainline in revision 5419.
  • Revision ID: andrew.bennetts@canonical.com-20100913063659-gs1d1xnsdbj59sx6
Merge lp:bzr/2.2, including fixes for #619872, #631350, #633745.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2009, 2010 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
from cStringIO import StringIO
 
18
import re
 
19
 
 
20
from bzrlib.cleanup import (
 
21
    _do_with_cleanups,
 
22
    _run_cleanup,
 
23
    ObjectWithCleanups,
 
24
    OperationWithCleanups,
 
25
    )
 
26
from bzrlib.tests import TestCase
 
27
from bzrlib import (
 
28
    debug,
 
29
    trace,
 
30
    )
 
31
 
 
32
 
 
33
class CleanupsTestCase(TestCase):
 
34
 
 
35
    def setUp(self):
 
36
        super(CleanupsTestCase, self).setUp()
 
37
        self.call_log = []
 
38
 
 
39
    def no_op_cleanup(self):
 
40
        self.call_log.append('no_op_cleanup')
 
41
 
 
42
    def assertLogContains(self, regex):
 
43
        self.assertContainsRe(self.get_log(), regex, re.DOTALL)
 
44
 
 
45
    def failing_cleanup(self):
 
46
        self.call_log.append('failing_cleanup')
 
47
        raise Exception("failing_cleanup goes boom!")
 
48
 
 
49
 
 
50
class TestRunCleanup(CleanupsTestCase):
 
51
 
 
52
    def test_no_errors(self):
 
53
        """The function passed to _run_cleanup is run."""
 
54
        self.assertTrue(_run_cleanup(self.no_op_cleanup))
 
55
        self.assertEqual(['no_op_cleanup'], self.call_log)
 
56
 
 
57
    def test_cleanup_with_args_kwargs(self):
 
58
        def func_taking_args_kwargs(*args, **kwargs):
 
59
            self.call_log.append(('func', args, kwargs))
 
60
        _run_cleanup(func_taking_args_kwargs, 'an arg', kwarg='foo')
 
61
        self.assertEqual(
 
62
            [('func', ('an arg',), {'kwarg': 'foo'})], self.call_log)
 
63
 
 
64
    def test_cleanup_error(self):
 
65
        """An error from the cleanup function is logged by _run_cleanup, but not
 
66
        propagated.
 
67
 
 
68
        This is there's no way for _run_cleanup to know if there's an existing
 
69
        exception in this situation::
 
70
            try:
 
71
              some_func()
 
72
            finally:
 
73
              _run_cleanup(cleanup_func)
 
74
        So, the best _run_cleanup can do is always log errors but never raise
 
75
        them.
 
76
        """
 
77
        self.assertFalse(_run_cleanup(self.failing_cleanup))
 
78
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
 
79
 
 
80
    def test_cleanup_error_debug_flag(self):
 
81
        """The -Dcleanup debug flag causes cleanup errors to be reported to the
 
82
        user.
 
83
        """
 
84
        log = StringIO()
 
85
        trace.push_log_file(log)
 
86
        debug.debug_flags.add('cleanup')
 
87
        self.assertFalse(_run_cleanup(self.failing_cleanup))
 
88
        self.assertContainsRe(
 
89
            log.getvalue(),
 
90
            "bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
 
91
 
 
92
    def test_prior_error_cleanup_succeeds(self):
 
93
        """Calling _run_cleanup from a finally block will not interfere with an
 
94
        exception from the try block.
 
95
        """
 
96
        def failing_operation():
 
97
            try:
 
98
                1/0
 
99
            finally:
 
100
                _run_cleanup(self.no_op_cleanup)
 
101
        self.assertRaises(ZeroDivisionError, failing_operation)
 
102
        self.assertEqual(['no_op_cleanup'], self.call_log)
 
103
 
 
104
    def test_prior_error_cleanup_fails(self):
 
105
        """Calling _run_cleanup from a finally block will not interfere with an
 
106
        exception from the try block even when the cleanup itself raises an
 
107
        exception.
 
108
 
 
109
        The cleanup exception will be logged.
 
110
        """
 
111
        def failing_operation():
 
112
            try:
 
113
                1/0
 
114
            finally:
 
115
                _run_cleanup(self.failing_cleanup)
 
116
        self.assertRaises(ZeroDivisionError, failing_operation)
 
117
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
 
118
 
 
119
 
 
120
class TestDoWithCleanups(CleanupsTestCase):
 
121
 
 
122
    def trivial_func(self):
 
123
        self.call_log.append('trivial_func')
 
124
        return 'trivial result'
 
125
 
 
126
    def test_runs_func(self):
 
127
        """_do_with_cleanups runs the function it is given, and returns the
 
128
        result.
 
129
        """
 
130
        result = _do_with_cleanups([], self.trivial_func)
 
131
        self.assertEqual('trivial result', result)
 
132
 
 
133
    def test_runs_cleanups(self):
 
134
        """Cleanup functions are run (in the given order)."""
 
135
        cleanup_func_1 = (self.call_log.append, ('cleanup 1',), {})
 
136
        cleanup_func_2 = (self.call_log.append, ('cleanup 2',), {})
 
137
        _do_with_cleanups([cleanup_func_1, cleanup_func_2], self.trivial_func)
 
138
        self.assertEqual(
 
139
            ['trivial_func', 'cleanup 1', 'cleanup 2'], self.call_log)
 
140
 
 
141
    def failing_func(self):
 
142
        self.call_log.append('failing_func')
 
143
        1/0
 
144
 
 
145
    def test_func_error_propagates(self):
 
146
        """Errors from the main function are propagated (after running
 
147
        cleanups).
 
148
        """
 
149
        self.assertRaises(
 
150
            ZeroDivisionError, _do_with_cleanups,
 
151
            [(self.no_op_cleanup, (), {})], self.failing_func)
 
152
        self.assertEqual(['failing_func', 'no_op_cleanup'], self.call_log)
 
153
 
 
154
    def test_func_error_trumps_cleanup_error(self):
 
155
        """Errors from the main function a propagated even if a cleanup raises
 
156
        an error.
 
157
 
 
158
        The cleanup error is be logged.
 
159
        """
 
160
        self.assertRaises(
 
161
            ZeroDivisionError, _do_with_cleanups,
 
162
            [(self.failing_cleanup, (), {})], self.failing_func)
 
163
        self.assertLogContains('Cleanup failed:.*failing_cleanup goes boom')
 
164
 
 
165
    def test_func_passes_and_error_from_cleanup(self):
 
166
        """An error from a cleanup is propagated when the main function doesn't
 
167
        raise an error.  Later cleanups are still executed.
 
168
        """
 
169
        exc = self.assertRaises(
 
170
            Exception, _do_with_cleanups,
 
171
            [(self.failing_cleanup, (), {}), (self.no_op_cleanup, (), {})],
 
172
            self.trivial_func)
 
173
        self.assertEqual('failing_cleanup goes boom!', exc.args[0])
 
174
        self.assertEqual(
 
175
            ['trivial_func', 'failing_cleanup', 'no_op_cleanup'],
 
176
            self.call_log)
 
177
 
 
178
    def test_multiple_cleanup_failures(self):
 
179
        """When multiple cleanups fail (as tends to happen when something has
 
180
        gone wrong), the first error is propagated, and subsequent errors are
 
181
        logged.
 
182
        """
 
183
        cleanups = self.make_two_failing_cleanup_funcs()
 
184
        self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
 
185
            self.trivial_func)
 
186
        self.assertLogContains('Cleanup failed:.*ErrorB')
 
187
        self.assertFalse('ErrorA' in self.get_log())
 
188
 
 
189
    def make_two_failing_cleanup_funcs(self):
 
190
        def raise_a():
 
191
            raise ErrorA('Error A')
 
192
        def raise_b():
 
193
            raise ErrorB('Error B')
 
194
        return [(raise_a, (), {}), (raise_b, (), {})]
 
195
 
 
196
    def test_multiple_cleanup_failures_debug_flag(self):
 
197
        log = StringIO()
 
198
        trace.push_log_file(log)
 
199
        debug.debug_flags.add('cleanup')
 
200
        cleanups = self.make_two_failing_cleanup_funcs()
 
201
        self.assertRaises(ErrorA, _do_with_cleanups, cleanups,
 
202
            self.trivial_func)
 
203
        self.assertContainsRe(
 
204
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
 
205
        self.assertEqual(1, log.getvalue().count('bzr: warning:'),
 
206
                log.getvalue())
 
207
 
 
208
    def test_func_and_cleanup_errors_debug_flag(self):
 
209
        log = StringIO()
 
210
        trace.push_log_file(log)
 
211
        debug.debug_flags.add('cleanup')
 
212
        cleanups = self.make_two_failing_cleanup_funcs()
 
213
        self.assertRaises(ZeroDivisionError, _do_with_cleanups, cleanups,
 
214
            self.failing_func)
 
215
        self.assertContainsRe(
 
216
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error A\n")
 
217
        self.assertContainsRe(
 
218
            log.getvalue(), "bzr: warning: Cleanup failed:.*Error B\n")
 
219
        self.assertEqual(2, log.getvalue().count('bzr: warning:'))
 
220
 
 
221
    def test_func_may_mutate_cleanups(self):
 
222
        """The main func may mutate the cleanups before it returns.
 
223
        
 
224
        This allows a function to gradually add cleanups as it acquires
 
225
        resources, rather than planning all the cleanups up-front.  The
 
226
        OperationWithCleanups helper relies on this working.
 
227
        """
 
228
        cleanups_list = []
 
229
        def func_that_adds_cleanups():
 
230
            self.call_log.append('func_that_adds_cleanups')
 
231
            cleanups_list.append((self.no_op_cleanup, (), {}))
 
232
            return 'result'
 
233
        result = _do_with_cleanups(cleanups_list, func_that_adds_cleanups)
 
234
        self.assertEqual('result', result)
 
235
        self.assertEqual(
 
236
            ['func_that_adds_cleanups', 'no_op_cleanup'], self.call_log)
 
237
 
 
238
    def test_cleanup_error_debug_flag(self):
 
239
        """The -Dcleanup debug flag causes cleanup errors to be reported to the
 
240
        user.
 
241
        """
 
242
        log = StringIO()
 
243
        trace.push_log_file(log)
 
244
        debug.debug_flags.add('cleanup')
 
245
        self.assertRaises(ZeroDivisionError, _do_with_cleanups,
 
246
            [(self.failing_cleanup, (), {})], self.failing_func)
 
247
        self.assertContainsRe(
 
248
            log.getvalue(),
 
249
            "bzr: warning: Cleanup failed:.*failing_cleanup goes boom")
 
250
        self.assertEqual(1, log.getvalue().count('bzr: warning:'))
 
251
 
 
252
 
 
253
class ErrorA(Exception): pass
 
254
class ErrorB(Exception): pass
 
255
 
 
256
 
 
257
class TestOperationWithCleanups(CleanupsTestCase):
 
258
 
 
259
    def test_cleanup_ordering(self):
 
260
        """Cleanups are added in LIFO order.
 
261
 
 
262
        So cleanups added before run is called are run last, and the last
 
263
        cleanup added during the func is run first.
 
264
        """
 
265
        call_log = []
 
266
        def func(op, foo):
 
267
            call_log.append(('func called', foo))
 
268
            op.add_cleanup(call_log.append, 'cleanup 2')
 
269
            op.add_cleanup(call_log.append, 'cleanup 1')
 
270
            return 'result'
 
271
        owc = OperationWithCleanups(func)
 
272
        owc.add_cleanup(call_log.append, 'cleanup 4')
 
273
        owc.add_cleanup(call_log.append, 'cleanup 3')
 
274
        result = owc.run('foo')
 
275
        self.assertEqual('result', result)
 
276
        self.assertEqual(
 
277
            [('func called', 'foo'), 'cleanup 1', 'cleanup 2', 'cleanup 3',
 
278
            'cleanup 4'], call_log)
 
279
 
 
280
 
 
281
class SampleWithCleanups(ObjectWithCleanups):
 
282
 
 
283
    pass
 
284
 
 
285
 
 
286
class TestObjectWithCleanups(TestCase):
 
287
 
 
288
    def test_object_with_cleanups(self):
 
289
        a = []
 
290
        s = SampleWithCleanups()
 
291
        s.add_cleanup(a.append, 42)
 
292
        s.cleanup_now()
 
293
        self.assertEqual(a, [42])