~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/__init__.py

  • Committer: Neil Santos
  • Date: 2010-03-05 05:30:19 UTC
  • mto: (5080.1.1 integration)
  • mto: This revision was merged to the branch mainline in revision 5081.
  • Revision ID: neil_santos@users.sourceforge.net-20100305053019-shzuqnhliw67q1nm
Renamed link() methods to hardlink(), as per mbp's suggestion

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
# along with this program; if not, write to the Free Software
15
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
16
 
17
 
"""Testing framework extensions"""
18
17
 
19
18
# TODO: Perhaps there should be an API to find out if bzr running under the
20
19
# test suite -- some plugins might want to avoid making intrusive changes if
29
28
 
30
29
import atexit
31
30
import codecs
32
 
import copy
 
31
from copy import copy
33
32
from cStringIO import StringIO
34
33
import difflib
35
34
import doctest
36
35
import errno
37
 
import itertools
38
36
import logging
 
37
import math
39
38
import os
40
 
import platform
41
 
import pprint
 
39
from pprint import pformat
42
40
import random
43
41
import re
44
42
import shlex
45
43
import stat
46
 
import subprocess
 
44
from subprocess import Popen, PIPE, STDOUT
47
45
import sys
48
46
import tempfile
49
47
import threading
71
69
    lock as _mod_lock,
72
70
    memorytree,
73
71
    osutils,
 
72
    progress,
74
73
    ui,
75
74
    urlutils,
76
75
    registry,
77
 
    transport as _mod_transport,
78
76
    workingtree,
79
77
    )
80
78
import bzrlib.branch
104
102
    )
105
103
import bzrlib.trace
106
104
from bzrlib.transport import (
 
105
    get_transport,
107
106
    memory,
108
107
    pathfilter,
109
108
    )
 
109
import bzrlib.transport
110
110
from bzrlib.trace import mutter, note
111
111
from bzrlib.tests import (
112
112
    test_server,
113
113
    TestUtil,
114
 
    treeshape,
115
114
    )
 
115
from bzrlib.tests.http_server import HttpServer
 
116
from bzrlib.tests.TestUtil import (
 
117
                          TestSuite,
 
118
                          TestLoader,
 
119
                          )
 
120
from bzrlib.tests.treeshape import build_tree_contents
116
121
from bzrlib.ui import NullProgressView
117
122
from bzrlib.ui.text import TextUIFactory
118
123
import bzrlib.version_info_formats.format_custom
134
139
SUBUNIT_SEEK_SET = 0
135
140
SUBUNIT_SEEK_CUR = 1
136
141
 
137
 
# These are intentionally brought into this namespace. That way plugins, etc
138
 
# can just "from bzrlib.tests import TestCase, TestLoader, etc"
139
 
TestSuite = TestUtil.TestSuite
140
 
TestLoader = TestUtil.TestLoader
141
142
 
142
 
class ExtendedTestResult(testtools.TextTestResult):
 
143
class ExtendedTestResult(unittest._TextTestResult):
143
144
    """Accepts, reports and accumulates the results of running tests.
144
145
 
145
146
    Compared to the unittest version this class adds support for
166
167
        :param bench_history: Optionally, a writable file object to accumulate
167
168
            benchmark results.
168
169
        """
169
 
        testtools.TextTestResult.__init__(self, stream)
 
170
        unittest._TextTestResult.__init__(self, stream, descriptions, verbosity)
170
171
        if bench_history is not None:
171
172
            from bzrlib.version import _get_bzr_source_tree
172
173
            src_tree = _get_bzr_source_tree()
193
194
        self.count = 0
194
195
        self._overall_start_time = time.time()
195
196
        self._strict = strict
196
 
        self._first_thread_leaker_id = None
197
 
        self._tests_leaking_threads_count = 0
198
197
 
199
198
    def stopTestRun(self):
200
199
        run = self.testsRun
201
200
        actionTaken = "Ran"
202
201
        stopTime = time.time()
203
202
        timeTaken = stopTime - self.startTime
204
 
        # GZ 2010-07-19: Seems testtools has no printErrors method, and though
205
 
        #                the parent class method is similar have to duplicate
206
 
        self._show_list('ERROR', self.errors)
207
 
        self._show_list('FAIL', self.failures)
208
 
        self.stream.write(self.sep2)
209
 
        self.stream.write("%s %d test%s in %.3fs\n\n" % (actionTaken,
 
203
        self.printErrors()
 
204
        self.stream.writeln(self.separator2)
 
205
        self.stream.writeln("%s %d test%s in %.3fs" % (actionTaken,
210
206
                            run, run != 1 and "s" or "", timeTaken))
 
207
        self.stream.writeln()
211
208
        if not self.wasSuccessful():
212
209
            self.stream.write("FAILED (")
213
210
            failed, errored = map(len, (self.failures, self.errors))
220
217
                if failed or errored: self.stream.write(", ")
221
218
                self.stream.write("known_failure_count=%d" %
222
219
                    self.known_failure_count)
223
 
            self.stream.write(")\n")
 
220
            self.stream.writeln(")")
224
221
        else:
225
222
            if self.known_failure_count:
226
 
                self.stream.write("OK (known_failures=%d)\n" %
 
223
                self.stream.writeln("OK (known_failures=%d)" %
227
224
                    self.known_failure_count)
228
225
            else:
229
 
                self.stream.write("OK\n")
 
226
                self.stream.writeln("OK")
230
227
        if self.skip_count > 0:
231
228
            skipped = self.skip_count
232
 
            self.stream.write('%d test%s skipped\n' %
 
229
            self.stream.writeln('%d test%s skipped' %
233
230
                                (skipped, skipped != 1 and "s" or ""))
234
231
        if self.unsupported:
235
232
            for feature, count in sorted(self.unsupported.items()):
236
 
                self.stream.write("Missing feature '%s' skipped %d tests.\n" %
 
233
                self.stream.writeln("Missing feature '%s' skipped %d tests." %
237
234
                    (feature, count))
238
235
        if self._strict:
239
236
            ok = self.wasStrictlySuccessful()
240
237
        else:
241
238
            ok = self.wasSuccessful()
242
 
        if self._first_thread_leaker_id:
 
239
        if TestCase._first_thread_leaker_id:
243
240
            self.stream.write(
244
241
                '%s is leaking threads among %d leaking tests.\n' % (
245
 
                self._first_thread_leaker_id,
246
 
                self._tests_leaking_threads_count))
 
242
                TestCase._first_thread_leaker_id,
 
243
                TestCase._leaking_threads_tests))
247
244
            # We don't report the main thread as an active one.
248
245
            self.stream.write(
249
246
                '%d non-main threads were left active in the end.\n'
250
 
                % (len(self._active_threads) - 1))
 
247
                % (TestCase._active_threads - 1))
251
248
 
252
249
    def getDescription(self, test):
253
250
        return test.id()
277
274
 
278
275
    def _shortened_test_description(self, test):
279
276
        what = test.id()
280
 
        what = re.sub(r'^bzrlib\.tests\.', '', what)
 
277
        what = re.sub(r'^bzrlib\.(tests|benchmarks)\.', '', what)
281
278
        return what
282
279
 
283
280
    def startTest(self, test):
284
 
        super(ExtendedTestResult, self).startTest(test)
 
281
        unittest.TestResult.startTest(self, test)
285
282
        if self.count == 0:
286
283
            self.startTests()
287
 
        self.count += 1
288
284
        self.report_test_start(test)
289
285
        test.number = self.count
290
286
        self._recordTestStartTime()
291
 
        # Only check for thread leaks if the test case supports cleanups
292
 
        addCleanup = getattr(test, "addCleanup", None)
293
 
        if addCleanup is not None:
294
 
            addCleanup(self._check_leaked_threads, test)
295
287
 
296
288
    def startTests(self):
297
 
        self.report_tests_starting()
298
 
        self._active_threads = threading.enumerate()
299
 
 
300
 
    def _check_leaked_threads(self, test):
301
 
        """See if any threads have leaked since last call
302
 
 
303
 
        A sample of live threads is stored in the _active_threads attribute,
304
 
        when this method runs it compares the current live threads and any not
305
 
        in the previous sample are treated as having leaked.
306
 
        """
307
 
        now_active_threads = set(threading.enumerate())
308
 
        threads_leaked = now_active_threads.difference(self._active_threads)
309
 
        if threads_leaked:
310
 
            self._report_thread_leak(test, threads_leaked, now_active_threads)
311
 
            self._tests_leaking_threads_count += 1
312
 
            if self._first_thread_leaker_id is None:
313
 
                self._first_thread_leaker_id = test.id()
314
 
            self._active_threads = now_active_threads
 
289
        import platform
 
290
        if getattr(sys, 'frozen', None) is None:
 
291
            bzr_path = osutils.realpath(sys.argv[0])
 
292
        else:
 
293
            bzr_path = sys.executable
 
294
        self.stream.write(
 
295
            'bzr selftest: %s\n' % (bzr_path,))
 
296
        self.stream.write(
 
297
            '   %s\n' % (
 
298
                    bzrlib.__path__[0],))
 
299
        self.stream.write(
 
300
            '   bzr-%s python-%s %s\n' % (
 
301
                    bzrlib.version_string,
 
302
                    bzrlib._format_version_tuple(sys.version_info),
 
303
                    platform.platform(aliased=1),
 
304
                    ))
 
305
        self.stream.write('\n')
315
306
 
316
307
    def _recordTestStartTime(self):
317
308
        """Record that a test has started."""
318
309
        self._start_time = time.time()
319
310
 
 
311
    def _cleanupLogFile(self, test):
 
312
        # We can only do this if we have one of our TestCases, not if
 
313
        # we have a doctest.
 
314
        setKeepLogfile = getattr(test, 'setKeepLogfile', None)
 
315
        if setKeepLogfile is not None:
 
316
            setKeepLogfile()
 
317
 
320
318
    def addError(self, test, err):
321
319
        """Tell result that test finished with an error.
322
320
 
324
322
        fails with an unexpected error.
325
323
        """
326
324
        self._post_mortem()
327
 
        super(ExtendedTestResult, self).addError(test, err)
 
325
        unittest.TestResult.addError(self, test, err)
328
326
        self.error_count += 1
329
327
        self.report_error(test, err)
330
328
        if self.stop_early:
331
329
            self.stop()
 
330
        self._cleanupLogFile(test)
332
331
 
333
332
    def addFailure(self, test, err):
334
333
        """Tell result that test failed.
337
336
        fails because e.g. an assert() method failed.
338
337
        """
339
338
        self._post_mortem()
340
 
        super(ExtendedTestResult, self).addFailure(test, err)
 
339
        unittest.TestResult.addFailure(self, test, err)
341
340
        self.failure_count += 1
342
341
        self.report_failure(test, err)
343
342
        if self.stop_early:
344
343
            self.stop()
 
344
        self._cleanupLogFile(test)
345
345
 
346
346
    def addSuccess(self, test, details=None):
347
347
        """Tell result that test completed successfully.
355
355
                    self._formatTime(benchmark_time),
356
356
                    test.id()))
357
357
        self.report_success(test)
358
 
        super(ExtendedTestResult, self).addSuccess(test)
 
358
        self._cleanupLogFile(test)
 
359
        unittest.TestResult.addSuccess(self, test)
359
360
        test._log_contents = ''
360
361
 
361
362
    def addExpectedFailure(self, test, err):
398
399
        else:
399
400
            raise errors.BzrError("Unknown whence %r" % whence)
400
401
 
401
 
    def report_tests_starting(self):
402
 
        """Display information before the test run begins"""
403
 
        if getattr(sys, 'frozen', None) is None:
404
 
            bzr_path = osutils.realpath(sys.argv[0])
405
 
        else:
406
 
            bzr_path = sys.executable
407
 
        self.stream.write(
408
 
            'bzr selftest: %s\n' % (bzr_path,))
409
 
        self.stream.write(
410
 
            '   %s\n' % (
411
 
                    bzrlib.__path__[0],))
412
 
        self.stream.write(
413
 
            '   bzr-%s python-%s %s\n' % (
414
 
                    bzrlib.version_string,
415
 
                    bzrlib._format_version_tuple(sys.version_info),
416
 
                    platform.platform(aliased=1),
417
 
                    ))
418
 
        self.stream.write('\n')
419
 
 
420
 
    def report_test_start(self, test):
421
 
        """Display information on the test just about to be run"""
422
 
 
423
 
    def _report_thread_leak(self, test, leaked_threads, active_threads):
424
 
        """Display information on a test that leaked one or more threads"""
425
 
        # GZ 2010-09-09: A leak summary reported separately from the general
426
 
        #                thread debugging would be nice. Tests under subunit
427
 
        #                need something not using stream, perhaps adding a
428
 
        #                testtools details object would be fitting.
429
 
        if 'threads' in selftest_debug_flags:
430
 
            self.stream.write('%s is leaking, active is now %d\n' %
431
 
                (test.id(), len(active_threads)))
 
402
    def report_cleaning_up(self):
 
403
        pass
432
404
 
433
405
    def startTestRun(self):
434
406
        self.startTime = time.time()
471
443
        self.pb.finished()
472
444
        super(TextTestResult, self).stopTestRun()
473
445
 
474
 
    def report_tests_starting(self):
475
 
        super(TextTestResult, self).report_tests_starting()
 
446
    def startTestRun(self):
 
447
        super(TextTestResult, self).startTestRun()
476
448
        self.pb.update('[test 0/%d] Starting' % (self.num_tests))
477
449
 
 
450
    def printErrors(self):
 
451
        # clear the pb to make room for the error listing
 
452
        self.pb.clear()
 
453
        super(TextTestResult, self).printErrors()
 
454
 
478
455
    def _progress_prefix_text(self):
479
456
        # the longer this text, the less space we have to show the test
480
457
        # name...
502
479
        return a
503
480
 
504
481
    def report_test_start(self, test):
 
482
        self.count += 1
505
483
        self.pb.update(
506
484
                self._progress_prefix_text()
507
485
                + ' '
511
489
        return self._shortened_test_description(test)
512
490
 
513
491
    def report_error(self, test, err):
514
 
        self.stream.write('ERROR: %s\n    %s\n' % (
 
492
        self.ui.note('ERROR: %s\n    %s\n' % (
515
493
            self._test_description(test),
516
494
            err[1],
517
495
            ))
518
496
 
519
497
    def report_failure(self, test, err):
520
 
        self.stream.write('FAIL: %s\n    %s\n' % (
 
498
        self.ui.note('FAIL: %s\n    %s\n' % (
521
499
            self._test_description(test),
522
500
            err[1],
523
501
            ))
534
512
    def report_unsupported(self, test, feature):
535
513
        """test cannot be run because feature is missing."""
536
514
 
 
515
    def report_cleaning_up(self):
 
516
        self.pb.update('Cleaning up')
 
517
 
537
518
 
538
519
class VerboseTestResult(ExtendedTestResult):
539
520
    """Produce long output, with one line per test run plus times"""
546
527
            result = a_string
547
528
        return result.ljust(final_width)
548
529
 
549
 
    def report_tests_starting(self):
 
530
    def startTestRun(self):
 
531
        super(VerboseTestResult, self).startTestRun()
550
532
        self.stream.write('running %d tests...\n' % self.num_tests)
551
 
        super(VerboseTestResult, self).report_tests_starting()
552
533
 
553
534
    def report_test_start(self, test):
 
535
        self.count += 1
554
536
        name = self._shortened_test_description(test)
555
537
        width = osutils.terminal_width()
556
538
        if width is not None:
568
550
        return '%s%s' % (indent, err[1])
569
551
 
570
552
    def report_error(self, test, err):
571
 
        self.stream.write('ERROR %s\n%s\n'
 
553
        self.stream.writeln('ERROR %s\n%s'
572
554
                % (self._testTimeString(test),
573
555
                   self._error_summary(err)))
574
556
 
575
557
    def report_failure(self, test, err):
576
 
        self.stream.write(' FAIL %s\n%s\n'
 
558
        self.stream.writeln(' FAIL %s\n%s'
577
559
                % (self._testTimeString(test),
578
560
                   self._error_summary(err)))
579
561
 
580
562
    def report_known_failure(self, test, err):
581
 
        self.stream.write('XFAIL %s\n%s\n'
 
563
        self.stream.writeln('XFAIL %s\n%s'
582
564
                % (self._testTimeString(test),
583
565
                   self._error_summary(err)))
584
566
 
585
567
    def report_success(self, test):
586
 
        self.stream.write('   OK %s\n' % self._testTimeString(test))
 
568
        self.stream.writeln('   OK %s' % self._testTimeString(test))
587
569
        for bench_called, stats in getattr(test, '_benchcalls', []):
588
 
            self.stream.write('LSProf output for %s(%s, %s)\n' % bench_called)
 
570
            self.stream.writeln('LSProf output for %s(%s, %s)' % bench_called)
589
571
            stats.pprint(file=self.stream)
590
572
        # flush the stream so that we get smooth output. This verbose mode is
591
573
        # used to show the output in PQM.
592
574
        self.stream.flush()
593
575
 
594
576
    def report_skip(self, test, reason):
595
 
        self.stream.write(' SKIP %s\n%s\n'
 
577
        self.stream.writeln(' SKIP %s\n%s'
596
578
                % (self._testTimeString(test), reason))
597
579
 
598
580
    def report_not_applicable(self, test, reason):
599
 
        self.stream.write('  N/A %s\n    %s\n'
 
581
        self.stream.writeln('  N/A %s\n    %s'
600
582
                % (self._testTimeString(test), reason))
601
583
 
602
584
    def report_unsupported(self, test, feature):
603
585
        """test cannot be run because feature is missing."""
604
 
        self.stream.write("NODEP %s\n    The feature '%s' is not available.\n"
 
586
        self.stream.writeln("NODEP %s\n    The feature '%s' is not available."
605
587
                %(self._testTimeString(test), feature))
606
588
 
607
589
 
636
618
            encode = codec.encode
637
619
        stream = osutils.UnicodeOrBytesToBytesWriter(encode, stream)
638
620
        stream.encoding = new_encoding
639
 
        self.stream = stream
 
621
        self.stream = unittest._WritelnDecorator(stream)
640
622
        self.descriptions = descriptions
641
623
        self.verbosity = verbosity
642
624
        self._bench_history = bench_history
766
748
    # XXX: Should probably unify more with CannedInputUIFactory or a
767
749
    # particular configuration of TextUIFactory, or otherwise have a clearer
768
750
    # idea of how they're supposed to be different.
769
 
    # See https://bugs.launchpad.net/bzr/+bug/408213
 
751
    # See https://bugs.edge.launchpad.net/bzr/+bug/408213
770
752
 
771
753
    def __init__(self, stdout=None, stderr=None, stdin=None):
772
754
        if stdin is not None:
806
788
    routine, and to build and check bzr trees.
807
789
 
808
790
    In addition to the usual method of overriding tearDown(), this class also
809
 
    allows subclasses to register cleanup functions via addCleanup, which are
 
791
    allows subclasses to register functions into the _cleanups list, which is
810
792
    run in order as the object is torn down.  It's less likely this will be
811
793
    accidentally overlooked.
812
794
    """
813
795
 
814
 
    _log_file = None
 
796
    _active_threads = None
 
797
    _leaking_threads_tests = 0
 
798
    _first_thread_leaker_id = None
 
799
    _log_file_name = None
815
800
    # record lsprof data when performing benchmark calls.
816
801
    _gather_lsprof_in_benchmarks = False
817
802
 
818
803
    def __init__(self, methodName='testMethod'):
819
804
        super(TestCase, self).__init__(methodName)
 
805
        self._cleanups = []
820
806
        self._directory_isolation = True
821
807
        self.exception_handlers.insert(0,
822
808
            (UnavailableFeature, self._do_unsupported_or_skip))
840
826
        self._track_transports()
841
827
        self._track_locks()
842
828
        self._clear_debug_flags()
 
829
        TestCase._active_threads = threading.activeCount()
 
830
        self.addCleanup(self._check_leaked_threads)
843
831
 
844
832
    def debug(self):
845
833
        # debug a frame up.
846
834
        import pdb
847
835
        pdb.Pdb().set_trace(sys._getframe().f_back)
848
836
 
849
 
    def discardDetail(self, name):
850
 
        """Extend the addDetail, getDetails api so we can remove a detail.
851
 
 
852
 
        eg. bzr always adds the 'log' detail at startup, but we don't want to
853
 
        include it for skipped, xfail, etc tests.
854
 
 
855
 
        It is safe to call this for a detail that doesn't exist, in case this
856
 
        gets called multiple times.
857
 
        """
858
 
        # We cheat. details is stored in __details which means we shouldn't
859
 
        # touch it. but getDetails() returns the dict directly, so we can
860
 
        # mutate it.
861
 
        details = self.getDetails()
862
 
        if name in details:
863
 
            del details[name]
 
837
    def _check_leaked_threads(self):
 
838
        active = threading.activeCount()
 
839
        leaked_threads = active - TestCase._active_threads
 
840
        TestCase._active_threads = active
 
841
        # If some tests make the number of threads *decrease*, we'll consider
 
842
        # that they are just observing old threads dieing, not agressively kill
 
843
        # random threads. So we don't report these tests as leaking. The risk
 
844
        # is that we have false positives that way (the test see 2 threads
 
845
        # going away but leak one) but it seems less likely than the actual
 
846
        # false positives (the test see threads going away and does not leak).
 
847
        if leaked_threads > 0:
 
848
            TestCase._leaking_threads_tests += 1
 
849
            if TestCase._first_thread_leaker_id is None:
 
850
                TestCase._first_thread_leaker_id = self.id()
864
851
 
865
852
    def _clear_debug_flags(self):
866
853
        """Prevent externally set debug flags affecting tests.
955
942
 
956
943
    def permit_dir(self, name):
957
944
        """Permit a directory to be used by this test. See permit_url."""
958
 
        name_transport = _mod_transport.get_transport(name)
 
945
        name_transport = get_transport(name)
959
946
        self.permit_url(name)
960
947
        self.permit_url(name_transport.base)
961
948
 
984
971
            try:
985
972
                workingtree.WorkingTree.open(path)
986
973
            except (errors.NotBranchError, errors.NoWorkingTree):
987
 
                raise TestSkipped('Needs a working tree of bzr sources')
 
974
                return
988
975
        finally:
989
976
            self.enable_directory_isolation()
990
977
 
1040
1027
        self.addCleanup(transport_server.stop_server)
1041
1028
        # Obtain a real transport because if the server supplies a password, it
1042
1029
        # will be hidden from the base on the client side.
1043
 
        t = _mod_transport.get_transport(transport_server.get_url())
 
1030
        t = get_transport(transport_server.get_url())
1044
1031
        # Some transport servers effectively chroot the backing transport;
1045
1032
        # others like SFTPServer don't - users of the transport can walk up the
1046
1033
        # transport to read the entire backing transport. This wouldn't matter
1107
1094
            message += '\n'
1108
1095
        raise AssertionError("%snot equal:\na = %s\nb = %s\n"
1109
1096
            % (message,
1110
 
               pprint.pformat(a), pprint.pformat(b)))
 
1097
               pformat(a), pformat(b)))
1111
1098
 
1112
1099
    assertEquals = assertEqual
1113
1100
 
1325
1312
            f.close()
1326
1313
        self.assertEqualDiff(content, s)
1327
1314
 
1328
 
    def assertDocstring(self, expected_docstring, obj):
1329
 
        """Fail if obj does not have expected_docstring"""
1330
 
        if __doc__ is None:
1331
 
            # With -OO the docstring should be None instead
1332
 
            self.assertIs(obj.__doc__, None)
1333
 
        else:
1334
 
            self.assertEqual(expected_docstring, obj.__doc__)
1335
 
 
1336
1315
    def failUnlessExists(self, path):
1337
1316
        """Fail unless path or paths, which may be abs or relative, exist."""
1338
1317
        if not isinstance(path, basestring):
1467
1446
 
1468
1447
        The file is removed as the test is torn down.
1469
1448
        """
1470
 
        self._log_file = StringIO()
 
1449
        fileno, name = tempfile.mkstemp(suffix='.log', prefix='testbzr')
 
1450
        self._log_file = os.fdopen(fileno, 'w+')
1471
1451
        self._log_memento = bzrlib.trace.push_log_file(self._log_file)
 
1452
        self._log_file_name = name
1472
1453
        self.addCleanup(self._finishLogFile)
1473
1454
 
1474
1455
    def _finishLogFile(self):
1496
1477
        """
1497
1478
        debug.debug_flags.discard('strict_locks')
1498
1479
 
 
1480
    def addCleanup(self, callable, *args, **kwargs):
 
1481
        """Arrange to run a callable when this case is torn down.
 
1482
 
 
1483
        Callables are run in the reverse of the order they are registered,
 
1484
        ie last-in first-out.
 
1485
        """
 
1486
        self._cleanups.append((callable, args, kwargs))
 
1487
 
1499
1488
    def overrideAttr(self, obj, attr_name, new=_unitialized_attr):
1500
1489
        """Overrides an object attribute restoring it after the test.
1501
1490
 
1526
1515
            'EDITOR': None,
1527
1516
            'BZR_EMAIL': None,
1528
1517
            'BZREMAIL': None, # may still be present in the environment
1529
 
            'EMAIL': 'jrandom@example.com', # set EMAIL as bzr does not guess
 
1518
            'EMAIL': None,
1530
1519
            'BZR_PROGRESS_BAR': None,
1531
1520
            'BZR_LOG': None,
1532
1521
            'BZR_PLUGIN_PATH': None,
1533
 
            'BZR_DISABLE_PLUGINS': None,
1534
 
            'BZR_PLUGINS_AT': None,
1535
1522
            'BZR_CONCURRENCY': None,
1536
1523
            # Make sure that any text ui tests are consistent regardless of
1537
1524
            # the environment the test case is run in; you may want tests that
1585
1572
        """This test has failed for some known reason."""
1586
1573
        raise KnownFailure(reason)
1587
1574
 
1588
 
    def _suppress_log(self):
1589
 
        """Remove the log info from details."""
1590
 
        self.discardDetail('log')
1591
 
 
1592
1575
    def _do_skip(self, result, reason):
1593
 
        self._suppress_log()
1594
1576
        addSkip = getattr(result, 'addSkip', None)
1595
1577
        if not callable(addSkip):
1596
1578
            result.addSuccess(result)
1599
1581
 
1600
1582
    @staticmethod
1601
1583
    def _do_known_failure(self, result, e):
1602
 
        self._suppress_log()
1603
1584
        err = sys.exc_info()
1604
1585
        addExpectedFailure = getattr(result, 'addExpectedFailure', None)
1605
1586
        if addExpectedFailure is not None:
1613
1594
            reason = 'No reason given'
1614
1595
        else:
1615
1596
            reason = e.args[0]
1616
 
        self._suppress_log ()
1617
1597
        addNotApplicable = getattr(result, 'addNotApplicable', None)
1618
1598
        if addNotApplicable is not None:
1619
1599
            result.addNotApplicable(self, reason)
1621
1601
            self._do_skip(result, reason)
1622
1602
 
1623
1603
    @staticmethod
1624
 
    def _report_skip(self, result, err):
1625
 
        """Override the default _report_skip.
1626
 
 
1627
 
        We want to strip the 'log' detail. If we waint until _do_skip, it has
1628
 
        already been formatted into the 'reason' string, and we can't pull it
1629
 
        out again.
1630
 
        """
1631
 
        self._suppress_log()
1632
 
        super(TestCase, self)._report_skip(self, result, err)
1633
 
 
1634
 
    @staticmethod
1635
 
    def _report_expected_failure(self, result, err):
1636
 
        """Strip the log.
1637
 
 
1638
 
        See _report_skip for motivation.
1639
 
        """
1640
 
        self._suppress_log()
1641
 
        super(TestCase, self)._report_expected_failure(self, result, err)
1642
 
 
1643
 
    @staticmethod
1644
1604
    def _do_unsupported_or_skip(self, result, e):
1645
1605
        reason = e.args[0]
1646
 
        self._suppress_log()
1647
1606
        addNotSupported = getattr(result, 'addNotSupported', None)
1648
1607
        if addNotSupported is not None:
1649
1608
            result.addNotSupported(self, reason)
1696
1655
                unicodestr = self._log_contents.decode('utf8', 'replace')
1697
1656
                self._log_contents = unicodestr.encode('utf8')
1698
1657
            return self._log_contents
1699
 
        if self._log_file is not None:
1700
 
            log_contents = self._log_file.getvalue()
 
1658
        import bzrlib.trace
 
1659
        if bzrlib.trace._trace_file:
 
1660
            # flush the log file, to get all content
 
1661
            bzrlib.trace._trace_file.flush()
 
1662
        if self._log_file_name is not None:
 
1663
            logfile = open(self._log_file_name)
 
1664
            try:
 
1665
                log_contents = logfile.read()
 
1666
            finally:
 
1667
                logfile.close()
1701
1668
            try:
1702
1669
                log_contents.decode('utf8')
1703
1670
            except UnicodeDecodeError:
1704
1671
                unicodestr = log_contents.decode('utf8', 'replace')
1705
1672
                log_contents = unicodestr.encode('utf8')
1706
1673
            if not keep_log_file:
 
1674
                self._log_file.close()
1707
1675
                self._log_file = None
1708
1676
                # Permit multiple calls to get_log until we clean it up in
1709
1677
                # finishLogFile
1710
1678
                self._log_contents = log_contents
 
1679
                try:
 
1680
                    os.remove(self._log_file_name)
 
1681
                except OSError, e:
 
1682
                    if sys.platform == 'win32' and e.errno == errno.EACCES:
 
1683
                        sys.stderr.write(('Unable to delete log file '
 
1684
                                             ' %r\n' % self._log_file_name))
 
1685
                    else:
 
1686
                        raise
 
1687
                self._log_file_name = None
1711
1688
            return log_contents
1712
1689
        else:
1713
 
            return "No log file content."
 
1690
            return "No log file content and no log file name."
1714
1691
 
1715
1692
    def get_log(self):
1716
1693
        """Get a unicode string containing the log from bzrlib.trace.
1931
1908
            variables. A value of None will unset the env variable.
1932
1909
            The values must be strings. The change will only occur in the
1933
1910
            child, so you don't need to fix the environment after running.
1934
 
        :param skip_if_plan_to_signal: raise TestSkipped when true and system
1935
 
            doesn't support signalling subprocesses.
 
1911
        :param skip_if_plan_to_signal: raise TestSkipped when true and os.kill
 
1912
            is not available.
1936
1913
        :param allow_plugins: If False (default) pass --no-plugins to bzr.
1937
1914
 
1938
1915
        :returns: Popen object for the started process.
1939
1916
        """
1940
1917
        if skip_if_plan_to_signal:
1941
 
            if os.name != "posix":
1942
 
                raise TestSkipped("Sending signals not supported")
 
1918
            if not getattr(os, 'kill', None):
 
1919
                raise TestSkipped("os.kill not available.")
1943
1920
 
1944
1921
        if env_changes is None:
1945
1922
            env_changes = {}
1972
1949
            if not allow_plugins:
1973
1950
                command.append('--no-plugins')
1974
1951
            command.extend(process_args)
1975
 
            process = self._popen(command, stdin=subprocess.PIPE,
1976
 
                                  stdout=subprocess.PIPE,
1977
 
                                  stderr=subprocess.PIPE)
 
1952
            process = self._popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE)
1978
1953
        finally:
1979
1954
            restore_environment()
1980
1955
            if cwd is not None:
1988
1963
        Allows tests to override this method to intercept the calls made to
1989
1964
        Popen for introspection.
1990
1965
        """
1991
 
        return subprocess.Popen(*args, **kwargs)
 
1966
        return Popen(*args, **kwargs)
1992
1967
 
1993
1968
    def get_source_path(self):
1994
1969
        """Return the path of the directory containing bzrlib."""
1996
1971
 
1997
1972
    def get_bzr_path(self):
1998
1973
        """Return the path of the 'bzr' executable for this test suite."""
1999
 
        bzr_path = os.path.join(self.get_source_path(), "bzr")
 
1974
        bzr_path = self.get_source_path()+'/bzr'
2000
1975
        if not os.path.isfile(bzr_path):
2001
1976
            # We are probably installed. Assume sys.argv is the right file
2002
1977
            bzr_path = sys.argv[0]
2174
2149
 
2175
2150
        :param relpath: a path relative to the base url.
2176
2151
        """
2177
 
        t = _mod_transport.get_transport(self.get_url(relpath))
 
2152
        t = get_transport(self.get_url(relpath))
2178
2153
        self.assertFalse(t.is_readonly())
2179
2154
        return t
2180
2155
 
2186
2161
 
2187
2162
        :param relpath: a path relative to the base url.
2188
2163
        """
2189
 
        t = _mod_transport.get_transport(self.get_readonly_url(relpath))
 
2164
        t = get_transport(self.get_readonly_url(relpath))
2190
2165
        self.assertTrue(t.is_readonly())
2191
2166
        return t
2192
2167
 
2322
2297
        propagating. This method ensures than a test did not leaked.
2323
2298
        """
2324
2299
        root = TestCaseWithMemoryTransport.TEST_ROOT
2325
 
        self.permit_url(_mod_transport.get_transport(root).base)
 
2300
        self.permit_url(get_transport(root).base)
2326
2301
        wt = workingtree.WorkingTree.open(root)
2327
2302
        last_rev = wt.last_revision()
2328
2303
        if last_rev != 'null:':
2373
2348
            # might be a relative or absolute path
2374
2349
            maybe_a_url = self.get_url(relpath)
2375
2350
            segments = maybe_a_url.rsplit('/', 1)
2376
 
            t = _mod_transport.get_transport(maybe_a_url)
 
2351
            t = get_transport(maybe_a_url)
2377
2352
            if len(segments) > 1 and segments[-1] not in ('', '.'):
2378
2353
                t.ensure_base()
2379
2354
            if format is None:
2396
2371
        made_control = self.make_bzrdir(relpath, format=format)
2397
2372
        return made_control.create_repository(shared=shared)
2398
2373
 
2399
 
    def make_smart_server(self, path, backing_server=None):
2400
 
        if backing_server is None:
2401
 
            backing_server = self.get_server()
 
2374
    def make_smart_server(self, path):
2402
2375
        smart_server = test_server.SmartTCPServer_for_testing()
2403
 
        self.start_server(smart_server, backing_server)
2404
 
        remote_transport = _mod_transport.get_transport(smart_server.get_url()
2405
 
                                                   ).clone(path)
 
2376
        self.start_server(smart_server, self.get_server())
 
2377
        remote_transport = get_transport(smart_server.get_url()).clone(path)
2406
2378
        return remote_transport
2407
2379
 
2408
2380
    def make_branch_and_memory_tree(self, relpath, format=None):
2423
2395
 
2424
2396
    def setUp(self):
2425
2397
        super(TestCaseWithMemoryTransport, self).setUp()
2426
 
        # Ensure that ConnectedTransport doesn't leak sockets
2427
 
        def get_transport_with_cleanup(*args, **kwargs):
2428
 
            t = orig_get_transport(*args, **kwargs)
2429
 
            if isinstance(t, _mod_transport.ConnectedTransport):
2430
 
                self.addCleanup(t.disconnect)
2431
 
            return t
2432
 
 
2433
 
        orig_get_transport = self.overrideAttr(_mod_transport, 'get_transport',
2434
 
                                               get_transport_with_cleanup)
2435
2398
        self._make_test_root()
2436
2399
        self.addCleanup(os.chdir, os.getcwdu())
2437
2400
        self.makeAndChdirToTestDir()
2482
2445
 
2483
2446
    def check_file_contents(self, filename, expect):
2484
2447
        self.log("check contents of file %s" % filename)
2485
 
        f = file(filename)
2486
 
        try:
2487
 
            contents = f.read()
2488
 
        finally:
2489
 
            f.close()
 
2448
        contents = file(filename, 'r').read()
2490
2449
        if contents != expect:
2491
2450
            self.log("expected: %r" % expect)
2492
2451
            self.log("actually: %r" % contents)
2566
2525
                "a list or a tuple. Got %r instead" % (shape,))
2567
2526
        # It's OK to just create them using forward slashes on windows.
2568
2527
        if transport is None or transport.is_readonly():
2569
 
            transport = _mod_transport.get_transport(".")
 
2528
            transport = get_transport(".")
2570
2529
        for name in shape:
2571
2530
            self.assertIsInstance(name, basestring)
2572
2531
            if name[-1] == '/':
2582
2541
                content = "contents of %s%s" % (name.encode('utf-8'), end)
2583
2542
                transport.put_bytes_non_atomic(urlutils.escape(name), content)
2584
2543
 
2585
 
    build_tree_contents = staticmethod(treeshape.build_tree_contents)
 
2544
    def build_tree_contents(self, shape):
 
2545
        build_tree_contents(shape)
2586
2546
 
2587
2547
    def assertInWorkingTree(self, path, root_path='.', tree=None):
2588
2548
        """Assert whether path or paths are in the WorkingTree"""
2729
2689
    """
2730
2690
 
2731
2691
    def setUp(self):
2732
 
        from bzrlib.tests import http_server
2733
2692
        super(ChrootedTestCase, self).setUp()
2734
2693
        if not self.vfs_transport_factory == memory.MemoryServer:
2735
 
            self.transport_readonly_server = http_server.HttpServer
 
2694
            self.transport_readonly_server = HttpServer
2736
2695
 
2737
2696
 
2738
2697
def condition_id_re(pattern):
2741
2700
    :param pattern: A regular expression string.
2742
2701
    :return: A callable that returns True if the re matches.
2743
2702
    """
2744
 
    filter_re = re.compile(pattern, 0)
 
2703
    filter_re = osutils.re_compile_checked(pattern, 0,
 
2704
        'test filter')
2745
2705
    def condition(test):
2746
2706
        test_id = test.id()
2747
2707
        return filter_re.search(test_id)
2999
2959
 
3000
2960
 
3001
2961
def fork_decorator(suite):
3002
 
    if getattr(os, "fork", None) is None:
3003
 
        raise errors.BzrCommandError("platform does not support fork,"
3004
 
            " try --parallel=subprocess instead.")
3005
2962
    concurrency = osutils.local_concurrency()
3006
2963
    if concurrency == 1:
3007
2964
        return suite
3062
3019
    return suite
3063
3020
 
3064
3021
 
3065
 
class TestDecorator(TestUtil.TestSuite):
 
3022
class TestDecorator(TestSuite):
3066
3023
    """A decorator for TestCase/TestSuite objects.
3067
3024
    
3068
3025
    Usually, subclasses should override __iter__(used when flattening test
3071
3028
    """
3072
3029
 
3073
3030
    def __init__(self, suite):
3074
 
        TestUtil.TestSuite.__init__(self)
 
3031
        TestSuite.__init__(self)
3075
3032
        self.addTest(suite)
3076
3033
 
3077
3034
    def countTestCases(self):
3196
3153
 
3197
3154
def partition_tests(suite, count):
3198
3155
    """Partition suite into count lists of tests."""
3199
 
    # This just assigns tests in a round-robin fashion.  On one hand this
3200
 
    # splits up blocks of related tests that might run faster if they shared
3201
 
    # resources, but on the other it avoids assigning blocks of slow tests to
3202
 
    # just one partition.  So the slowest partition shouldn't be much slower
3203
 
    # than the fastest.
3204
 
    partitions = [list() for i in range(count)]
3205
 
    tests = iter_suite_tests(suite)
3206
 
    for partition, test in itertools.izip(itertools.cycle(partitions), tests):
3207
 
        partition.append(test)
3208
 
    return partitions
3209
 
 
3210
 
 
3211
 
def workaround_zealous_crypto_random():
3212
 
    """Crypto.Random want to help us being secure, but we don't care here.
3213
 
 
3214
 
    This workaround some test failure related to the sftp server. Once paramiko
3215
 
    stop using the controversial API in Crypto.Random, we may get rid of it.
3216
 
    """
3217
 
    try:
3218
 
        from Crypto.Random import atfork
3219
 
        atfork()
3220
 
    except ImportError:
3221
 
        pass
 
3156
    result = []
 
3157
    tests = list(iter_suite_tests(suite))
 
3158
    tests_per_process = int(math.ceil(float(len(tests)) / count))
 
3159
    for block in range(count):
 
3160
        low_test = block * tests_per_process
 
3161
        high_test = low_test + tests_per_process
 
3162
        process_tests = tests[low_test:high_test]
 
3163
        result.append(process_tests)
 
3164
    return result
3222
3165
 
3223
3166
 
3224
3167
def fork_for_tests(suite):
3241
3184
            try:
3242
3185
                ProtocolTestCase.run(self, result)
3243
3186
            finally:
3244
 
                os.waitpid(self.pid, 0)
 
3187
                os.waitpid(self.pid, os.WNOHANG)
3245
3188
 
3246
3189
    test_blocks = partition_tests(suite, concurrency)
3247
3190
    for process_tests in test_blocks:
3248
 
        process_suite = TestUtil.TestSuite()
 
3191
        process_suite = TestSuite()
3249
3192
        process_suite.addTests(process_tests)
3250
3193
        c2pread, c2pwrite = os.pipe()
3251
3194
        pid = os.fork()
3252
3195
        if pid == 0:
3253
 
            workaround_zealous_crypto_random()
3254
3196
            try:
3255
3197
                os.close(c2pread)
3256
3198
                # Leave stderr and stdout open so we can see test noise
3317
3259
                '--subunit']
3318
3260
            if '--no-plugins' in sys.argv:
3319
3261
                argv.append('--no-plugins')
3320
 
            # stderr=subprocess.STDOUT would be ideal, but until we prevent
3321
 
            # noise on stderr it can interrupt the subunit protocol.
3322
 
            process = subprocess.Popen(argv, stdin=subprocess.PIPE,
3323
 
                                      stdout=subprocess.PIPE,
3324
 
                                      stderr=subprocess.PIPE,
3325
 
                                      bufsize=1)
 
3262
            # stderr=STDOUT would be ideal, but until we prevent noise on
 
3263
            # stderr it can interrupt the subunit protocol.
 
3264
            process = Popen(argv, stdin=PIPE, stdout=PIPE, stderr=PIPE,
 
3265
                bufsize=1)
3326
3266
            test = TestInSubprocess(process, test_list_file_name)
3327
3267
            result.append(test)
3328
3268
        except:
3377
3317
 
3378
3318
    def startTest(self, test):
3379
3319
        self.profiler = bzrlib.lsprof.BzrProfiler()
3380
 
        # Prevent deadlocks in tests that use lsprof: those tests will
3381
 
        # unavoidably fail.
3382
 
        bzrlib.lsprof.BzrProfiler.profiler_block = 0
3383
3320
        self.profiler.start()
3384
3321
        ForwardingResult.startTest(self, test)
3385
3322
 
3406
3343
#                           rather than failing tests. And no longer raise
3407
3344
#                           LockContention when fctnl locks are not being used
3408
3345
#                           with proper exclusion rules.
3409
 
#   -Ethreads               Will display thread ident at creation/join time to
3410
 
#                           help track thread leaks
3411
3346
selftest_debug_flags = set()
3412
3347
 
3413
3348
 
3646
3581
        'bzrlib.doc',
3647
3582
        'bzrlib.tests.blackbox',
3648
3583
        'bzrlib.tests.commands',
3649
 
        'bzrlib.tests.doc_generate',
3650
3584
        'bzrlib.tests.per_branch',
3651
3585
        'bzrlib.tests.per_bzrdir',
3652
 
        'bzrlib.tests.per_controldir',
3653
 
        'bzrlib.tests.per_controldir_colo',
3654
3586
        'bzrlib.tests.per_foreign_vcs',
3655
3587
        'bzrlib.tests.per_interrepository',
3656
3588
        'bzrlib.tests.per_intertree',
3669
3601
        'bzrlib.tests.per_workingtree',
3670
3602
        'bzrlib.tests.test__annotator',
3671
3603
        'bzrlib.tests.test__bencode',
3672
 
        'bzrlib.tests.test__btree_serializer',
3673
3604
        'bzrlib.tests.test__chk_map',
3674
3605
        'bzrlib.tests.test__dirstate_helpers',
3675
3606
        'bzrlib.tests.test__groupcompress',
3718
3649
        'bzrlib.tests.test_export',
3719
3650
        'bzrlib.tests.test_extract',
3720
3651
        'bzrlib.tests.test_fetch',
3721
 
        'bzrlib.tests.test_fixtures',
3722
3652
        'bzrlib.tests.test_fifo_cache',
3723
3653
        'bzrlib.tests.test_filters',
3724
3654
        'bzrlib.tests.test_ftp_transport',
3745
3675
        'bzrlib.tests.test_knit',
3746
3676
        'bzrlib.tests.test_lazy_import',
3747
3677
        'bzrlib.tests.test_lazy_regex',
3748
 
        'bzrlib.tests.test_library_state',
3749
3678
        'bzrlib.tests.test_lock',
3750
3679
        'bzrlib.tests.test_lockable_files',
3751
3680
        'bzrlib.tests.test_lockdir',
3753
3682
        'bzrlib.tests.test_lru_cache',
3754
3683
        'bzrlib.tests.test_lsprof',
3755
3684
        'bzrlib.tests.test_mail_client',
3756
 
        'bzrlib.tests.test_matchers',
3757
3685
        'bzrlib.tests.test_memorytree',
3758
3686
        'bzrlib.tests.test_merge',
3759
3687
        'bzrlib.tests.test_merge3',
3808
3736
        'bzrlib.tests.test_switch',
3809
3737
        'bzrlib.tests.test_symbol_versioning',
3810
3738
        'bzrlib.tests.test_tag',
3811
 
        'bzrlib.tests.test_test_server',
3812
3739
        'bzrlib.tests.test_testament',
3813
3740
        'bzrlib.tests.test_textfile',
3814
3741
        'bzrlib.tests.test_textmerge',
3820
3747
        'bzrlib.tests.test_transport_log',
3821
3748
        'bzrlib.tests.test_tree',
3822
3749
        'bzrlib.tests.test_treebuilder',
3823
 
        'bzrlib.tests.test_treeshape',
3824
3750
        'bzrlib.tests.test_tsort',
3825
3751
        'bzrlib.tests.test_tuned_gzip',
3826
3752
        'bzrlib.tests.test_ui',
3830
3756
        'bzrlib.tests.test_urlutils',
3831
3757
        'bzrlib.tests.test_version',
3832
3758
        'bzrlib.tests.test_version_info',
3833
 
        'bzrlib.tests.test_versionedfile',
3834
3759
        'bzrlib.tests.test_weave',
3835
3760
        'bzrlib.tests.test_whitebox',
3836
3761
        'bzrlib.tests.test_win32utils',
3842
3767
 
3843
3768
 
3844
3769
def _test_suite_modules_to_doctest():
3845
 
    """Return the list of modules to doctest."""
3846
 
    if __doc__ is None:
3847
 
        # GZ 2009-03-31: No docstrings with -OO so there's nothing to doctest
3848
 
        return []
 
3770
    """Return the list of modules to doctest."""   
3849
3771
    return [
3850
3772
        'bzrlib',
3851
3773
        'bzrlib.branchbuilder',
3858
3780
        'bzrlib.option',
3859
3781
        'bzrlib.symbol_versioning',
3860
3782
        'bzrlib.tests',
3861
 
        'bzrlib.tests.fixtures',
3862
3783
        'bzrlib.timestamp',
3863
3784
        'bzrlib.version_info_formats.format_custom',
3864
3785
        ]
4005
3926
    ...     bzrlib.tests.test_sampler.DemoTest('test_nothing'),
4006
3927
    ...     [('one', dict(param=1)),
4007
3928
    ...      ('two', dict(param=2))],
4008
 
    ...     TestUtil.TestSuite())
 
3929
    ...     TestSuite())
4009
3930
    >>> tests = list(iter_suite_tests(r))
4010
3931
    >>> len(tests)
4011
3932
    2
4058
3979
    :param new_id: The id to assign to it.
4059
3980
    :return: The new test.
4060
3981
    """
4061
 
    new_test = copy.copy(test)
 
3982
    new_test = copy(test)
4062
3983
    new_test.id = lambda: new_id
4063
 
    # XXX: Workaround <https://bugs.launchpad.net/testtools/+bug/637725>, which
4064
 
    # causes cloned tests to share the 'details' dict.  This makes it hard to
4065
 
    # read the test output for parameterized tests, because tracebacks will be
4066
 
    # associated with irrelevant tests.
4067
 
    try:
4068
 
        details = new_test._TestCase__details
4069
 
    except AttributeError:
4070
 
        # must be a different version of testtools than expected.  Do nothing.
4071
 
        pass
4072
 
    else:
4073
 
        # Reset the '__details' dict.
4074
 
        new_test._TestCase__details = {}
4075
3984
    return new_test
4076
3985
 
4077
3986
 
4137
4046
        if test_id != None:
4138
4047
            ui.ui_factory.clear_term()
4139
4048
            sys.stderr.write('\nWhile running: %s\n' % (test_id,))
4140
 
        # Ugly, but the last thing we want here is fail, so bear with it.
4141
 
        printable_e = str(e).decode(osutils.get_user_encoding(), 'replace'
4142
 
                                    ).encode('ascii', 'replace')
4143
4049
        sys.stderr.write('Unable to remove testing dir %s\n%s'
4144
 
                         % (os.path.basename(dirname), printable_e))
 
4050
                         % (os.path.basename(dirname), e))
4145
4051
 
4146
4052
 
4147
4053
class Feature(object):
4377
4283
UnicodeFilename = _UnicodeFilename()
4378
4284
 
4379
4285
 
4380
 
class _ByteStringNamedFilesystem(Feature):
4381
 
    """Is the filesystem based on bytes?"""
4382
 
 
4383
 
    def _probe(self):
4384
 
        if os.name == "posix":
4385
 
            return True
4386
 
        return False
4387
 
 
4388
 
ByteStringNamedFilesystem = _ByteStringNamedFilesystem()
4389
 
 
4390
 
 
4391
4286
class _UTF8Filesystem(Feature):
4392
4287
    """Is the filesystem UTF-8?"""
4393
4288
 
4503
4398
try:
4504
4399
    from subunit import TestProtocolClient
4505
4400
    from subunit.test_results import AutoTimingTestResultDecorator
4506
 
    class SubUnitBzrProtocolClient(TestProtocolClient):
4507
 
 
4508
 
        def addSuccess(self, test, details=None):
4509
 
            # The subunit client always includes the details in the subunit
4510
 
            # stream, but we don't want to include it in ours.
4511
 
            if details is not None and 'log' in details:
4512
 
                del details['log']
4513
 
            return super(SubUnitBzrProtocolClient, self).addSuccess(
4514
 
                test, details)
4515
 
 
4516
4401
    class SubUnitBzrRunner(TextTestRunner):
4517
4402
        def run(self, test):
4518
4403
            result = AutoTimingTestResultDecorator(
4519
 
                SubUnitBzrProtocolClient(self.stream))
 
4404
                TestProtocolClient(self.stream))
4520
4405
            test.run(result)
4521
4406
            return result
4522
4407
except ImportError:
4523
4408
    pass
4524
 
 
4525
 
class _PosixPermissionsFeature(Feature):
4526
 
 
4527
 
    def _probe(self):
4528
 
        def has_perms():
4529
 
            # create temporary file and check if specified perms are maintained.
4530
 
            import tempfile
4531
 
 
4532
 
            write_perms = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
4533
 
            f = tempfile.mkstemp(prefix='bzr_perms_chk_')
4534
 
            fd, name = f
4535
 
            os.close(fd)
4536
 
            os.chmod(name, write_perms)
4537
 
 
4538
 
            read_perms = os.stat(name).st_mode & 0777
4539
 
            os.unlink(name)
4540
 
            return (write_perms == read_perms)
4541
 
 
4542
 
        return (os.name == 'posix') and has_perms()
4543
 
 
4544
 
    def feature_name(self):
4545
 
        return 'POSIX permissions support'
4546
 
 
4547
 
posix_permissions_feature = _PosixPermissionsFeature()