~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_trace.py

  • Committer: John Arbash Meinel
  • Date: 2006-08-18 16:48:53 UTC
  • mto: (1946.2.6 reduce-knit-churn)
  • mto: This revision was merged to the branch mainline in revision 1948.
  • Revision ID: john@arbash-meinel.com-20060818164853-eb25e4db3385e216
Add a couple more sftp benchmarks

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005-2011, 2016 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
# "weren't nothing promised to you.  do i look like i got a promise face?"
18
18
 
19
19
"""Tests for trace library"""
20
20
 
21
 
from cStringIO import StringIO
22
21
import errno
23
 
import logging
24
22
import os
25
 
import re
26
23
import sys
27
 
import tempfile
 
24
from StringIO import StringIO
28
25
 
29
 
from bzrlib import (
30
 
    debug,
31
 
    errors,
32
 
    trace,
33
 
    )
34
 
from bzrlib.tests import features, TestCaseInTempDir, TestCase
35
 
from bzrlib.trace import (
36
 
    mutter, mutter_callsite, report_exception,
37
 
    set_verbosity_level, get_verbosity_level, is_quiet, is_verbose, be_quiet,
38
 
    pop_log_file,
39
 
    push_log_file,
40
 
    _rollover_trace_maybe,
41
 
    show_error,
42
 
    )
 
26
from bzrlib.tests import TestCaseInTempDir, TestCase
 
27
from bzrlib.trace import mutter, report_exception
 
28
from bzrlib.errors import NotBranchError
43
29
 
44
30
 
45
31
def _format_exception():
52
38
class TestTrace(TestCase):
53
39
 
54
40
    def test_format_sys_exception(self):
55
 
        # Test handling of an internal/unexpected error that probably
56
 
        # indicates a bug in bzr.  The details of the message may vary
57
 
        # depending on whether apport is available or not.  See test_crash for
58
 
        # more.
59
41
        try:
60
42
            raise NotImplementedError, "time travel"
61
43
        except NotImplementedError:
64
46
        self.assertEqualDiff(err.splitlines()[0],
65
47
                'bzr: ERROR: exceptions.NotImplementedError: time travel')
66
48
        self.assertContainsRe(err,
67
 
            'Bazaar has encountered an internal error.')
 
49
                r'File.*test_trace.py')
68
50
 
69
51
    def test_format_interrupt_exception(self):
70
52
        try:
76
58
        self.assertTrue(len(msg) > 0)
77
59
        self.assertEqualDiff(msg, 'bzr: interrupted\n')
78
60
 
79
 
    def test_format_memory_error(self):
80
 
        try:
81
 
            raise MemoryError()
82
 
        except MemoryError:
83
 
            pass
84
 
        msg = _format_exception()
85
 
        self.assertEqual(msg,
86
 
            "bzr: out of memory\nUse -Dmem_dump to dump memory to a file.\n")
87
 
 
88
 
    def test_format_mem_dump(self):
89
 
        self.requireFeature(features.meliae)
90
 
        debug.debug_flags.add('mem_dump')
91
 
        try:
92
 
            raise MemoryError()
93
 
        except MemoryError:
94
 
            pass
95
 
        msg = _format_exception()
96
 
        self.assertStartsWith(msg,
97
 
            "bzr: out of memory\nMemory dumped to ")
98
 
 
99
61
    def test_format_os_error(self):
100
62
        try:
101
 
            os.rmdir('nosuchfile22222')
102
 
        except OSError, e:
103
 
            e_str = str(e)
104
 
        msg = _format_exception()
105
 
        # Linux seems to give "No such file" but Windows gives "The system
106
 
        # cannot find the file specified".
107
 
        self.assertEqual('bzr: ERROR: %s\n' % (e_str,), msg)
108
 
 
109
 
    def test_format_io_error(self):
110
 
        try:
111
63
            file('nosuchfile22222')
112
 
        except IOError:
113
 
            pass
114
 
        msg = _format_exception()
115
 
        # Even though Windows and Linux differ for 'os.rmdir', they both give
116
 
        # 'No such file' for open()
117
 
        # However it now gets translated so we can not test for a specific message
118
 
        self.assertContainsRe(msg,
119
 
            r'^bzr: ERROR: \[Errno .*\] .*nosuchfile')
120
 
 
121
 
    def test_format_pywintypes_error(self):
122
 
        self.requireFeature(features.pywintypes)
123
 
        import pywintypes, win32file
124
 
        try:
125
 
            win32file.RemoveDirectory('nosuchfile22222')
126
 
        except pywintypes.error:
127
 
            pass
128
 
        msg = _format_exception()
129
 
        # GZ 2010-05-03: Formatting for pywintypes.error is basic, a 3-tuple
130
 
        #                with errno, function name, and locale error message
131
 
        self.assertContainsRe(msg,
132
 
            r"^bzr: ERROR: \(2, 'RemoveDirectory[AW]?', .*\)")
133
 
            
134
 
    def test_format_sockets_error(self):
135
 
        try:
136
 
            import socket
137
 
            sock = socket.socket()
138
 
            sock.send("This should fail.")
139
 
        except socket.error:
140
 
            pass
141
 
        msg = _format_exception()
142
 
        
143
 
        self.assertNotContainsRe(msg,
144
 
            r"Traceback (most recent call last):")
145
 
 
146
 
    def test_format_unicode_error(self):
147
 
        try:
148
 
            raise errors.BzrCommandError(u'argument foo\xb5 does not exist')
149
 
        except errors.BzrCommandError:
150
 
            pass
151
 
        msg = _format_exception()
 
64
        except (OSError, IOError):
 
65
            pass
 
66
        msg = _format_exception()
 
67
        self.assertContainsRe(msg, r'^bzr: ERROR: \[Errno .*\] No such file.*nosuchfile')
 
68
 
152
69
 
153
70
    def test_format_exception(self):
154
71
        """Short formatting of bzr exceptions"""
155
72
        try:
156
 
            raise errors.NotBranchError('wibble')
157
 
        except errors.NotBranchError:
 
73
            raise NotBranchError, 'wibble'
 
74
        except NotBranchError:
158
75
            pass
159
76
        msg = _format_exception()
160
77
        self.assertTrue(len(msg) > 0)
161
 
        self.assertEqualDiff(msg, 'bzr: ERROR: Not a branch: \"wibble\".\n')
162
 
 
163
 
    def test_report_external_import_error(self):
164
 
        """Short friendly message for missing system modules."""
165
 
        try:
166
 
            import ImaginaryModule
167
 
        except ImportError, e:
168
 
            pass
169
 
        else:
170
 
            self.fail("somehow succeeded in importing %r" % ImaginaryModule)
171
 
        msg = _format_exception()
172
 
        self.assertEqual(msg,
173
 
            'bzr: ERROR: No module named ImaginaryModule\n'
174
 
            'You may need to install this Python library separately.\n')
175
 
 
176
 
    def test_report_import_syntax_error(self):
177
 
        try:
178
 
            raise ImportError("syntax error")
179
 
        except ImportError, e:
180
 
            pass
181
 
        msg = _format_exception()
182
 
        self.assertContainsRe(msg,
183
 
            r'Bazaar has encountered an internal error')
 
78
        self.assertEqualDiff(msg, 'bzr: ERROR: Not a branch: wibble\n')
184
79
 
185
80
    def test_trace_unicode(self):
186
81
        """Write Unicode to trace log"""
187
82
        self.log(u'the unicode character for benzene is \N{BENZENE RING}')
188
 
        log = self.get_log()
189
 
        self.assertContainsRe(log, "the unicode character for benzene is")
190
 
 
191
 
    def test_trace_argument_unicode(self):
192
 
        """Write a Unicode argument to the trace log"""
193
 
        mutter(u'the unicode character for benzene is %s', u'\N{BENZENE RING}')
194
 
        log = self.get_log()
195
 
        self.assertContainsRe(log, 'the unicode character')
196
 
 
197
 
    def test_trace_argument_utf8(self):
198
 
        """Write a Unicode argument to the trace log"""
199
 
        mutter(u'the unicode character for benzene is %s',
200
 
               u'\N{BENZENE RING}'.encode('utf-8'))
201
 
        log = self.get_log()
202
 
        self.assertContainsRe(log, 'the unicode character')
 
83
        self.assertContainsRe('the unicode character',
 
84
                self._get_log())
203
85
 
204
86
    def test_report_broken_pipe(self):
205
87
        try:
206
88
            raise IOError(errno.EPIPE, 'broken pipe foofofo')
207
 
        except IOError as e:
 
89
        except IOError, e:
208
90
            msg = _format_exception()
209
 
            self.assertEqual(msg, "bzr: broken pipe\n")
 
91
            self.assertEquals(msg, "bzr: broken pipe\n")
210
92
        else:
211
93
            self.fail("expected error not raised")
212
94
 
213
 
    def assertLogStartsWith(self, log, string):
214
 
        """Like assertStartsWith, but skips the log timestamp."""
215
 
        self.assertContainsRe(log,
216
 
            '^\\d+\\.\\d+  ' + re.escape(string))
217
 
 
218
 
    def test_mutter_callsite_1(self):
219
 
        """mutter_callsite can capture 1 level of stack frame."""
220
 
        mutter_callsite(1, "foo %s", "a string")
221
 
        log = self.get_log()
222
 
        # begin with the message
223
 
        self.assertLogStartsWith(log, 'foo a string\nCalled from:\n')
224
 
        # should show two frame: this frame and the one above
225
 
        self.assertContainsRe(log,
226
 
            'test_trace\\.py", line \\d+, in test_mutter_callsite_1\n')
227
 
        # this frame should be the final one
228
 
        self.assertEndsWith(log, ' "a string")\n')
229
 
 
230
 
    def test_mutter_callsite_2(self):
231
 
        """mutter_callsite can capture 2 levels of stack frame."""
232
 
        mutter_callsite(2, "foo %s", "a string")
233
 
        log = self.get_log()
234
 
        # begin with the message
235
 
        self.assertLogStartsWith(log, 'foo a string\nCalled from:\n')
236
 
        # should show two frame: this frame and the one above
237
 
        self.assertContainsRe(log,
238
 
            'test_trace.py", line \d+, in test_mutter_callsite_2\n')
239
 
        # this frame should be the final one
240
 
        self.assertEndsWith(log, ' "a string")\n')
241
 
 
242
95
    def test_mutter_never_fails(self):
243
96
        # Even if the decode/encode stage fails, mutter should not
244
97
        # raise an exception
245
 
        # This test checks that mutter doesn't fail; the current behaviour
246
 
        # is that it doesn't fail *and writes non-utf8*.
247
98
        mutter(u'Writing a greek mu (\xb5) works in a unicode string')
248
99
        mutter('But fails in an ascii string \xb5')
249
 
        mutter('and in an ascii argument: %s', '\xb5')
250
 
        log = self.get_log()
 
100
        # TODO: jam 20051227 mutter() doesn't flush the log file, and
 
101
        #       self._get_log() opens the file directly and reads it.
 
102
        #       So we need to manually flush the log file
 
103
        import bzrlib.trace
 
104
        bzrlib.trace._trace_file.flush()
 
105
        log = self._get_log()
251
106
        self.assertContainsRe(log, 'Writing a greek mu')
252
 
        self.assertContainsRe(log, "But fails in an ascii string")
253
 
        # However, the log content object does unicode replacement on reading
254
 
        # to let it get unicode back where good data has been written. So we
255
 
        # have to do a replaceent here as well.
256
 
        self.assertContainsRe(log, "ascii argument: \xb5".decode('utf8',
257
 
            'replace'))
258
 
 
259
 
    def test_show_error(self):
260
 
        show_error('error1')
261
 
        show_error(u'error2 \xb5 blah')
262
 
        show_error('arg: %s', 'blah')
263
 
        show_error('arg2: %(key)s', {'key':'stuff'})
264
 
        try:
265
 
            raise Exception("oops")
266
 
        except:
267
 
            show_error('kwarg', exc_info=True)
268
 
        log = self.get_log()
269
 
        self.assertContainsRe(log, 'error1')
270
 
        self.assertContainsRe(log, u'error2 \xb5 blah')
271
 
        self.assertContainsRe(log, 'arg: blah')
272
 
        self.assertContainsRe(log, 'arg2: stuff')
273
 
        self.assertContainsRe(log, 'kwarg')
274
 
        self.assertContainsRe(log, 'Traceback \\(most recent call last\\):')
275
 
        self.assertContainsRe(log, 'File ".*test_trace.py", line .*, in test_show_error')
276
 
        self.assertContainsRe(log, 'raise Exception\\("oops"\\)')
277
 
        self.assertContainsRe(log, 'Exception: oops')
278
 
 
279
 
    def test_push_log_file(self):
280
 
        """Can push and pop log file, and this catches mutter messages.
281
 
 
282
 
        This is primarily for use in the test framework.
283
 
        """
284
 
        tmp1 = tempfile.NamedTemporaryFile()
285
 
        tmp2 = tempfile.NamedTemporaryFile()
286
 
        try:
287
 
            memento1 = push_log_file(tmp1)
288
 
            mutter("comment to file1")
289
 
            try:
290
 
                memento2 = push_log_file(tmp2)
291
 
                try:
292
 
                    mutter("comment to file2")
293
 
                finally:
294
 
                    pop_log_file(memento2)
295
 
                mutter("again to file1")
296
 
            finally:
297
 
                pop_log_file(memento1)
298
 
            # the files were opened in binary mode, so should have exactly
299
 
            # these bytes.  and removing the file as the log target should
300
 
            # have caused them to be flushed out.  need to match using regexps
301
 
            # as there's a timestamp at the front.
302
 
            tmp1.seek(0)
303
 
            self.assertContainsRe(tmp1.read(),
304
 
                r"\d+\.\d+  comment to file1\n\d+\.\d+  again to file1\n")
305
 
            tmp2.seek(0)
306
 
            self.assertContainsRe(tmp2.read(),
307
 
                r"\d+\.\d+  comment to file2\n")
308
 
        finally:
309
 
            tmp1.close()
310
 
            tmp2.close()
311
 
 
312
 
    def test__open_bzr_log_uses_stderr_for_failures(self):
313
 
        # If _open_bzr_log cannot open the file, then we should write the
314
 
        # warning to stderr. Since this is normally happening before logging is
315
 
        # set up.
316
 
        self.overrideAttr(sys, 'stderr', StringIO())
317
 
        # Set the log file to something that cannot exist
318
 
        self.overrideEnv('BZR_LOG', os.getcwd() + '/no-dir/bzr.log')
319
 
        self.overrideAttr(trace, '_bzr_log_filename')
320
 
        logf = trace._open_bzr_log()
321
 
        self.assertIs(None, logf)
322
 
        self.assertContainsRe(sys.stderr.getvalue(),
323
 
                              'failed to open trace file: .*/no-dir/bzr.log')
324
 
 
325
 
 
326
 
class TestVerbosityLevel(TestCase):
327
 
 
328
 
    def test_verbosity_level(self):
329
 
        set_verbosity_level(1)
330
 
        self.assertEqual(1, get_verbosity_level())
331
 
        self.assertTrue(is_verbose())
332
 
        self.assertFalse(is_quiet())
333
 
        set_verbosity_level(-1)
334
 
        self.assertEqual(-1, get_verbosity_level())
335
 
        self.assertFalse(is_verbose())
336
 
        self.assertTrue(is_quiet())
337
 
        set_verbosity_level(0)
338
 
        self.assertEqual(0, get_verbosity_level())
339
 
        self.assertFalse(is_verbose())
340
 
        self.assertFalse(is_quiet())
341
 
 
342
 
    def test_be_quiet(self):
343
 
        # Confirm the old API still works
344
 
        be_quiet(True)
345
 
        self.assertEqual(-1, get_verbosity_level())
346
 
        be_quiet(False)
347
 
        self.assertEqual(0, get_verbosity_level())
348
 
 
349
 
 
350
 
class TestLogging(TestCase):
351
 
    """Check logging functionality robustly records information"""
352
 
 
353
 
    def test_note(self):
354
 
        trace.note("Noted")
355
 
        self.assertEqual("    INFO  Noted\n", self.get_log())
356
 
 
357
 
    def test_warning(self):
358
 
        trace.warning("Warned")
359
 
        self.assertEqual(" WARNING  Warned\n", self.get_log())
360
 
 
361
 
    def test_log(self):
362
 
        logging.getLogger("bzr").error("Errored")
363
 
        self.assertEqual("   ERROR  Errored\n", self.get_log())
364
 
 
365
 
    def test_log_sub(self):
366
 
        logging.getLogger("bzr.test_log_sub").debug("Whispered")
367
 
        self.assertEqual("   DEBUG  Whispered\n", self.get_log())
368
 
 
369
 
    def test_log_unicode_msg(self):
370
 
        logging.getLogger("bzr").debug(u"\xa7")
371
 
        self.assertEqual(u"   DEBUG  \xa7\n", self.get_log())
372
 
 
373
 
    def test_log_unicode_arg(self):
374
 
        logging.getLogger("bzr").debug("%s", u"\xa7")
375
 
        self.assertEqual(u"   DEBUG  \xa7\n", self.get_log())
376
 
 
377
 
    def test_log_utf8_msg(self):
378
 
        logging.getLogger("bzr").debug("\xc2\xa7")
379
 
        self.assertEqual(u"   DEBUG  \xa7\n", self.get_log())
380
 
 
381
 
    def test_log_utf8_arg(self):
382
 
        logging.getLogger("bzr").debug("%s", "\xc2\xa7")
383
 
        self.assertEqual(u"   DEBUG  \xa7\n", self.get_log())
384
 
 
385
 
    def test_log_bytes_msg(self):
386
 
        logging.getLogger("bzr").debug("\xa7")
387
 
        log = self.get_log()
388
 
        self.assertContainsString(log, "UnicodeDecodeError: ")
389
 
        self.assertContainsString(log,
390
 
            "Logging record unformattable: '\\xa7' % ()\n")
391
 
 
392
 
    def test_log_bytes_arg(self):
393
 
        logging.getLogger("bzr").debug("%s", "\xa7")
394
 
        log = self.get_log()
395
 
        self.assertContainsString(log, "UnicodeDecodeError: ")
396
 
        self.assertContainsString(log,
397
 
            "Logging record unformattable: '%s' % ('\\xa7',)\n")
398
 
 
399
 
    def test_log_mixed_strings(self):
400
 
        logging.getLogger("bzr").debug(u"%s", "\xa7")
401
 
        log = self.get_log()
402
 
        self.assertContainsString(log, "UnicodeDecodeError: ")
403
 
        self.assertContainsString(log,
404
 
            "Logging record unformattable: u'%s' % ('\\xa7',)\n")
405
 
 
406
 
    def test_log_repr_broken(self):
407
 
        class BadRepr(object):
408
 
            def __repr__(self):
409
 
                raise ValueError("Broken object")
410
 
        logging.getLogger("bzr").debug("%s", BadRepr())
411
 
        log = self.get_log()
412
 
        self.assertContainsRe(log, "ValueError: Broken object\n")
413
 
        self.assertContainsRe(log, "Logging record unformattable: '%s' % .*\n")
414
 
 
415
 
 
416
 
class TestBzrLog(TestCaseInTempDir):
417
 
 
418
 
    def test_log_rollover(self):
419
 
        temp_log_name = 'test-log'
420
 
        trace_file = open(temp_log_name, 'at')
421
 
        trace_file.writelines(['test_log_rollover padding\n'] * 200000)
422
 
        trace_file.close()
423
 
        _rollover_trace_maybe(temp_log_name)
424
 
        # should have been rolled over
425
 
        self.assertFalse(os.access(temp_log_name, os.R_OK))
426
 
 
427
 
 
428
 
class TestTraceConfiguration(TestCaseInTempDir):
429
 
 
430
 
    def test_default_config(self):
431
 
        config = trace.DefaultConfig()
432
 
        self.overrideAttr(trace, "_bzr_log_filename", None)
433
 
        trace._bzr_log_filename = None
434
 
        expected_filename = trace._get_bzr_log_filename()
435
 
        self.assertEqual(None, trace._bzr_log_filename)
436
 
        config.__enter__()
437
 
        try:
438
 
            # Should have entered and setup a default filename.
439
 
            self.assertEqual(expected_filename, trace._bzr_log_filename)
440
 
        finally:
441
 
            config.__exit__(None, None, None)
442
 
            # Should have exited and cleaned up.
443
 
            self.assertEqual(None, trace._bzr_log_filename)
 
107
        self.assertContainsRe(log, 'UnicodeError')
 
108
        self.assertContainsRe(log, "'But fails in an ascii string")