~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_trace.py

  • Committer: John Arbash Meinel
  • Date: 2009-03-04 21:22:50 UTC
  • mto: (0.17.34 trunk)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: john@arbash-meinel.com-20090304212250-xcvwt1yx4zt76pev
Have the GroupCompressBlock decide how to compress the header and content.
It can now decide whether they should be compressed together or not.
As long as we make the to_bytes() function match the from_bytes() one, we should be fine.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2010 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
# "weren't nothing promised to you.  do i look like i got a promise face?"
18
 
 
19
 
"""Tests for trace library"""
20
 
 
21
 
from cStringIO import StringIO
22
 
import errno
23
 
import os
24
 
import re
25
 
import sys
26
 
import tempfile
27
 
 
28
 
from bzrlib import (
29
 
    errors,
30
 
    trace,
31
 
    )
32
 
from bzrlib.tests import features, TestCaseInTempDir, TestCase
33
 
from bzrlib.trace import (
34
 
    mutter, mutter_callsite, report_exception,
35
 
    set_verbosity_level, get_verbosity_level, is_quiet, is_verbose, be_quiet,
36
 
    pop_log_file,
37
 
    push_log_file,
38
 
    _rollover_trace_maybe,
39
 
    show_error,
40
 
    )
41
 
 
42
 
 
43
 
def _format_exception():
44
 
    """Format an exception as it would normally be displayed to the user"""
45
 
    buf = StringIO()
46
 
    report_exception(sys.exc_info(), buf)
47
 
    return buf.getvalue()
48
 
 
49
 
 
50
 
class TestTrace(TestCase):
51
 
 
52
 
    def test_format_sys_exception(self):
53
 
        # Test handling of an internal/unexpected error that probably
54
 
        # indicates a bug in bzr.  The details of the message may vary
55
 
        # depending on whether apport is available or not.  See test_crash for
56
 
        # more.
57
 
        try:
58
 
            raise NotImplementedError, "time travel"
59
 
        except NotImplementedError:
60
 
            pass
61
 
        err = _format_exception()
62
 
        self.assertEqualDiff(err.splitlines()[0],
63
 
                'bzr: ERROR: exceptions.NotImplementedError: time travel')
64
 
        self.assertContainsRe(err,
65
 
            'Bazaar has encountered an internal error.')
66
 
 
67
 
    def test_format_interrupt_exception(self):
68
 
        try:
69
 
            raise KeyboardInterrupt()
70
 
        except KeyboardInterrupt:
71
 
            # XXX: Some risk that a *real* keyboard interrupt won't be seen
72
 
            pass
73
 
        msg = _format_exception()
74
 
        self.assertTrue(len(msg) > 0)
75
 
        self.assertEqualDiff(msg, 'bzr: interrupted\n')
76
 
 
77
 
    def test_format_memory_error(self):
78
 
        try:
79
 
            raise MemoryError()
80
 
        except MemoryError:
81
 
            pass
82
 
        msg = _format_exception()
83
 
        self.assertEquals(msg,
84
 
            "bzr: out of memory\n")
85
 
 
86
 
    def test_format_os_error(self):
87
 
        try:
88
 
            os.rmdir('nosuchfile22222')
89
 
        except OSError, e:
90
 
            e_str = str(e)
91
 
        msg = _format_exception()
92
 
        # Linux seems to give "No such file" but Windows gives "The system
93
 
        # cannot find the file specified".
94
 
        self.assertEqual('bzr: ERROR: %s\n' % (e_str,), msg)
95
 
 
96
 
    def test_format_io_error(self):
97
 
        try:
98
 
            file('nosuchfile22222')
99
 
        except IOError:
100
 
            pass
101
 
        msg = _format_exception()
102
 
        # Even though Windows and Linux differ for 'os.rmdir', they both give
103
 
        # 'No such file' for open()
104
 
        self.assertContainsRe(msg,
105
 
            r'^bzr: ERROR: \[Errno .*\] No such file.*nosuchfile')
106
 
 
107
 
    def test_format_pywintypes_error(self):
108
 
        self.requireFeature(features.pywintypes)
109
 
        import pywintypes, win32file
110
 
        try:
111
 
            win32file.RemoveDirectory('nosuchfile22222')
112
 
        except pywintypes.error:
113
 
            pass
114
 
        msg = _format_exception()
115
 
        # GZ 2010-05-03: Formatting for pywintypes.error is basic, a 3-tuple
116
 
        #                with errno, function name, and locale error message
117
 
        self.assertContainsRe(msg,
118
 
            r"^bzr: ERROR: \(2, 'RemoveDirectory[AW]?', .*\)")
119
 
 
120
 
    def test_format_unicode_error(self):
121
 
        try:
122
 
            raise errors.BzrCommandError(u'argument foo\xb5 does not exist')
123
 
        except errors.BzrCommandError:
124
 
            pass
125
 
        msg = _format_exception()
126
 
 
127
 
    def test_format_exception(self):
128
 
        """Short formatting of bzr exceptions"""
129
 
        try:
130
 
            raise errors.NotBranchError('wibble')
131
 
        except errors.NotBranchError:
132
 
            pass
133
 
        msg = _format_exception()
134
 
        self.assertTrue(len(msg) > 0)
135
 
        self.assertEqualDiff(msg, 'bzr: ERROR: Not a branch: \"wibble\".\n')
136
 
 
137
 
    def test_report_external_import_error(self):
138
 
        """Short friendly message for missing system modules."""
139
 
        try:
140
 
            import ImaginaryModule
141
 
        except ImportError, e:
142
 
            pass
143
 
        else:
144
 
            self.fail("somehow succeeded in importing %r" % ImaginaryModule)
145
 
        msg = _format_exception()
146
 
        self.assertEqual(msg,
147
 
            'bzr: ERROR: No module named ImaginaryModule\n'
148
 
            'You may need to install this Python library separately.\n')
149
 
 
150
 
    def test_report_import_syntax_error(self):
151
 
        try:
152
 
            raise ImportError("syntax error")
153
 
        except ImportError, e:
154
 
            pass
155
 
        msg = _format_exception()
156
 
        self.assertContainsRe(msg,
157
 
            r'Bazaar has encountered an internal error')
158
 
 
159
 
    def test_trace_unicode(self):
160
 
        """Write Unicode to trace log"""
161
 
        self.log(u'the unicode character for benzene is \N{BENZENE RING}')
162
 
        log = self.get_log()
163
 
        self.assertContainsRe(log, "the unicode character for benzene is")
164
 
 
165
 
    def test_trace_argument_unicode(self):
166
 
        """Write a Unicode argument to the trace log"""
167
 
        mutter(u'the unicode character for benzene is %s', u'\N{BENZENE RING}')
168
 
        log = self.get_log()
169
 
        self.assertContainsRe(log, 'the unicode character')
170
 
 
171
 
    def test_trace_argument_utf8(self):
172
 
        """Write a Unicode argument to the trace log"""
173
 
        mutter(u'the unicode character for benzene is %s',
174
 
               u'\N{BENZENE RING}'.encode('utf-8'))
175
 
        log = self.get_log()
176
 
        self.assertContainsRe(log, 'the unicode character')
177
 
 
178
 
    def test_report_broken_pipe(self):
179
 
        try:
180
 
            raise IOError(errno.EPIPE, 'broken pipe foofofo')
181
 
        except IOError, e:
182
 
            msg = _format_exception()
183
 
            self.assertEquals(msg, "bzr: broken pipe\n")
184
 
        else:
185
 
            self.fail("expected error not raised")
186
 
 
187
 
    def assertLogStartsWith(self, log, string):
188
 
        """Like assertStartsWith, but skips the log timestamp."""
189
 
        self.assertContainsRe(log,
190
 
            '^\\d+\\.\\d+  ' + re.escape(string))
191
 
 
192
 
    def test_mutter_callsite_1(self):
193
 
        """mutter_callsite can capture 1 level of stack frame."""
194
 
        mutter_callsite(1, "foo %s", "a string")
195
 
        log = self.get_log()
196
 
        # begin with the message
197
 
        self.assertLogStartsWith(log, 'foo a string\nCalled from:\n')
198
 
        # should show two frame: this frame and the one above
199
 
        self.assertContainsRe(log,
200
 
            'test_trace\\.py", line \\d+, in test_mutter_callsite_1\n')
201
 
        # this frame should be the final one
202
 
        self.assertEndsWith(log, ' "a string")\n')
203
 
 
204
 
    def test_mutter_callsite_2(self):
205
 
        """mutter_callsite can capture 2 levels of stack frame."""
206
 
        mutter_callsite(2, "foo %s", "a string")
207
 
        log = self.get_log()
208
 
        # begin with the message
209
 
        self.assertLogStartsWith(log, 'foo a string\nCalled from:\n')
210
 
        # should show two frame: this frame and the one above
211
 
        self.assertContainsRe(log,
212
 
            'test_trace.py", line \d+, in test_mutter_callsite_2\n')
213
 
        # this frame should be the final one
214
 
        self.assertEndsWith(log, ' "a string")\n')
215
 
 
216
 
    def test_mutter_never_fails(self):
217
 
        # Even if the decode/encode stage fails, mutter should not
218
 
        # raise an exception
219
 
        # This test checks that mutter doesn't fail; the current behaviour
220
 
        # is that it doesn't fail *and writes non-utf8*.
221
 
        mutter(u'Writing a greek mu (\xb5) works in a unicode string')
222
 
        mutter('But fails in an ascii string \xb5')
223
 
        mutter('and in an ascii argument: %s', '\xb5')
224
 
        log = self.get_log()
225
 
        self.assertContainsRe(log, 'Writing a greek mu')
226
 
        self.assertContainsRe(log, "But fails in an ascii string")
227
 
        # However, the log content object does unicode replacement on reading
228
 
        # to let it get unicode back where good data has been written. So we
229
 
        # have to do a replaceent here as well.
230
 
        self.assertContainsRe(log, "ascii argument: \xb5".decode('utf8',
231
 
            'replace'))
232
 
        
233
 
    def test_show_error(self):
234
 
        show_error('error1')
235
 
        show_error(u'error2 \xb5 blah')
236
 
        show_error('arg: %s', 'blah')
237
 
        show_error('arg2: %(key)s', {'key':'stuff'})
238
 
        try:
239
 
            raise Exception("oops")
240
 
        except:
241
 
            show_error('kwarg', exc_info=True)
242
 
        log = self.get_log()
243
 
        self.assertContainsRe(log, 'error1')
244
 
        self.assertContainsRe(log, u'error2 \xb5 blah')
245
 
        self.assertContainsRe(log, 'arg: blah')
246
 
        self.assertContainsRe(log, 'arg2: stuff')
247
 
        self.assertContainsRe(log, 'kwarg')
248
 
        self.assertContainsRe(log, 'Traceback \\(most recent call last\\):')
249
 
        self.assertContainsRe(log, 'File ".*test_trace.py", line .*, in test_show_error')
250
 
        self.assertContainsRe(log, 'raise Exception\\("oops"\\)')
251
 
        self.assertContainsRe(log, 'Exception: oops')
252
 
 
253
 
    def test_push_log_file(self):
254
 
        """Can push and pop log file, and this catches mutter messages.
255
 
 
256
 
        This is primarily for use in the test framework.
257
 
        """
258
 
        tmp1 = tempfile.NamedTemporaryFile()
259
 
        tmp2 = tempfile.NamedTemporaryFile()
260
 
        try:
261
 
            memento1 = push_log_file(tmp1)
262
 
            mutter("comment to file1")
263
 
            try:
264
 
                memento2 = push_log_file(tmp2)
265
 
                try:
266
 
                    mutter("comment to file2")
267
 
                finally:
268
 
                    pop_log_file(memento2)
269
 
                mutter("again to file1")
270
 
            finally:
271
 
                pop_log_file(memento1)
272
 
            # the files were opened in binary mode, so should have exactly
273
 
            # these bytes.  and removing the file as the log target should
274
 
            # have caused them to be flushed out.  need to match using regexps
275
 
            # as there's a timestamp at the front.
276
 
            tmp1.seek(0)
277
 
            self.assertContainsRe(tmp1.read(),
278
 
                r"\d+\.\d+  comment to file1\n\d+\.\d+  again to file1\n")
279
 
            tmp2.seek(0)
280
 
            self.assertContainsRe(tmp2.read(),
281
 
                r"\d+\.\d+  comment to file2\n")
282
 
        finally:
283
 
            tmp1.close()
284
 
            tmp2.close()
285
 
 
286
 
    def test__open_bzr_log_uses_stderr_for_failures(self):
287
 
        # If _open_bzr_log cannot open the file, then we should write the
288
 
        # warning to stderr. Since this is normally happening before logging is
289
 
        # set up.
290
 
        self.overrideAttr(sys, 'stderr', StringIO())
291
 
        # Set the log file to something that cannot exist
292
 
        # FIXME: A bit dangerous: we are not in an isolated dir here -- vilajam
293
 
        # 20100125
294
 
        os.environ['BZR_LOG'] = os.getcwd() + '/no-dir/bzr.log'
295
 
        self.overrideAttr(trace, '_bzr_log_filename')
296
 
        logf = trace._open_bzr_log()
297
 
        self.assertIs(None, logf)
298
 
        self.assertContainsRe(sys.stderr.getvalue(),
299
 
                              'failed to open trace file: .*/no-dir/bzr.log')
300
 
 
301
 
 
302
 
class TestVerbosityLevel(TestCase):
303
 
 
304
 
    def test_verbosity_level(self):
305
 
        set_verbosity_level(1)
306
 
        self.assertEqual(1, get_verbosity_level())
307
 
        self.assertTrue(is_verbose())
308
 
        self.assertFalse(is_quiet())
309
 
        set_verbosity_level(-1)
310
 
        self.assertEqual(-1, get_verbosity_level())
311
 
        self.assertFalse(is_verbose())
312
 
        self.assertTrue(is_quiet())
313
 
        set_verbosity_level(0)
314
 
        self.assertEqual(0, get_verbosity_level())
315
 
        self.assertFalse(is_verbose())
316
 
        self.assertFalse(is_quiet())
317
 
 
318
 
    def test_be_quiet(self):
319
 
        # Confirm the old API still works
320
 
        be_quiet(True)
321
 
        self.assertEqual(-1, get_verbosity_level())
322
 
        be_quiet(False)
323
 
        self.assertEqual(0, get_verbosity_level())
324
 
 
325
 
 
326
 
class TestBzrLog(TestCaseInTempDir):
327
 
 
328
 
    def test_log_rollover(self):
329
 
        temp_log_name = 'test-log'
330
 
        trace_file = open(temp_log_name, 'at')
331
 
        trace_file.writelines(['test_log_rollover padding\n'] * 200000)
332
 
        trace_file.close()
333
 
        _rollover_trace_maybe(temp_log_name)
334
 
        # should have been rolled over
335
 
        self.assertFalse(os.access(temp_log_name, os.R_OK))
336
 
 
337
 
 
338
 
class TestTraceConfiguration(TestCaseInTempDir):
339
 
 
340
 
    def test_default_config(self):
341
 
        config = trace.DefaultConfig()
342
 
        self.overrideAttr(trace, "_bzr_log_filename", None)
343
 
        trace._bzr_log_filename = None
344
 
        expected_filename = trace._get_bzr_log_filename()
345
 
        self.assertEqual(None, trace._bzr_log_filename)
346
 
        config.__enter__()
347
 
        try:
348
 
            # Should have entered and setup a default filename.
349
 
            self.assertEqual(expected_filename, trace._bzr_log_filename)
350
 
        finally:
351
 
            config.__exit__(None, None, None)
352
 
            # Should have exited and cleaned up.
353
 
            self.assertEqual(None, trace._bzr_log_filename)