~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_progress.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-03-28 06:58:22 UTC
  • mfrom: (2379.2.3 hpss-chroot)
  • Revision ID: pqm@pqm.ubuntu.com-20070328065822-999550a858a3ced3
(robertc) Fix chroot urls to not expose the url of the transport they are protecting, allowing regular url operations to work on them. (Robert Collins, Andrew Bennetts)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2006-2011 Canonical Ltd
 
1
# Copyright (C) 2006 Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
from cStringIO import StringIO
19
 
 
20
 
from bzrlib import (
21
 
    tests,
22
 
    )
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
import os
 
18
from StringIO import StringIO
 
19
 
 
20
from bzrlib import errors
23
21
from bzrlib.progress import (
24
 
    ProgressTask,
25
 
    )
26
 
from bzrlib.ui.text import (
27
 
    TextProgressView,
28
 
    )
29
 
 
30
 
 
31
 
class TestTextProgressView(tests.TestCase):
32
 
    """Tests for text display of progress bars.
33
 
 
34
 
    These try to exercise the progressview independently of its construction,
35
 
    which is arranged by the TextUIFactory.
36
 
    """
37
 
    # The ProgressTask now connects directly to the ProgressView, so we can
38
 
    # check them independently of the factory or of the determination of what
39
 
    # view to use.
40
 
 
41
 
    def make_view_only(self, out, width=79):
42
 
        view = TextProgressView(out)
43
 
        view._avail_width = lambda: width
44
 
        return view
45
 
 
46
 
    def make_view(self):
47
 
        out = StringIO()
48
 
        return out, self.make_view_only(out)
49
 
 
50
 
    def make_task(self, parent_task, view, msg, curr, total):
51
 
        # would normally be done by UIFactory; is done here so that we don't
52
 
        # have to have one.
53
 
        task = ProgressTask(parent_task, progress_view=view)
54
 
        task.msg = msg
55
 
        task.current_cnt = curr
56
 
        task.total_cnt = total
57
 
        return task
 
22
        DummyProgress, ChildProgress,
 
23
        TTYProgressBar,
 
24
        DotsProgressBar,
 
25
        ProgressBarStack,
 
26
        )
 
27
from bzrlib.tests import TestCase
 
28
 
 
29
 
 
30
class FakeStack:
 
31
    def __init__(self, top):
 
32
        self.__top = top
 
33
 
 
34
    def top(self):
 
35
        return self.__top
 
36
 
 
37
class InstrumentedProgress(TTYProgressBar):
 
38
    """TTYProgress variant that tracks outcomes"""
 
39
 
 
40
    def __init__(self, *args, **kwargs):
 
41
        self.always_throttled = True
 
42
        TTYProgressBar.__init__(self, *args, **kwargs)
 
43
 
 
44
    def throttle(self, old_message):
 
45
        result = TTYProgressBar.throttle(self, old_message)
 
46
        if result is False:
 
47
            self.always_throttled = False
 
48
        
 
49
 
 
50
class _TTYStringIO(StringIO):
 
51
    """A helper class which makes a StringIO look like a terminal"""
 
52
 
 
53
    def isatty(self):
 
54
        return True
 
55
 
 
56
 
 
57
class _NonTTYStringIO(StringIO):
 
58
    """Helper that implements isatty() but returns False"""
 
59
 
 
60
    def isatty(self):
 
61
        return False
 
62
 
 
63
 
 
64
class TestProgress(TestCase):
 
65
    def setUp(self):
 
66
        q = DummyProgress()
 
67
        self.top = ChildProgress(_stack=FakeStack(q))
 
68
 
 
69
    def test_propogation(self):
 
70
        self.top.update('foobles', 1, 2)
 
71
        self.assertEqual(self.top.message, 'foobles')
 
72
        self.assertEqual(self.top.current, 1)
 
73
        self.assertEqual(self.top.total, 2)
 
74
        self.assertEqual(self.top.child_fraction, 0)
 
75
        child = ChildProgress(_stack=FakeStack(self.top))
 
76
        child.update('baubles', 2, 4)
 
77
        self.assertEqual(self.top.message, 'foobles')
 
78
        self.assertEqual(self.top.current, 1)
 
79
        self.assertEqual(self.top.total, 2)
 
80
        self.assertEqual(self.top.child_fraction, 0.5)
 
81
        grandchild = ChildProgress(_stack=FakeStack(child))
 
82
        grandchild.update('barbells', 1, 2)
 
83
        self.assertEqual(self.top.child_fraction, 0.625)
 
84
        self.assertEqual(child.child_fraction, 0.5)
 
85
        child.update('baubles', 3, 4)
 
86
        self.assertEqual(child.child_fraction, 0)
 
87
        self.assertEqual(self.top.child_fraction, 0.75)
 
88
        grandchild.update('barbells', 1, 2)
 
89
        self.assertEqual(self.top.child_fraction, 0.875)
 
90
        grandchild.update('barbells', 2, 2)
 
91
        self.assertEqual(self.top.child_fraction, 1)
 
92
        child.update('baubles', 4, 4)
 
93
        self.assertEqual(self.top.child_fraction, 1)
 
94
        #test clamping
 
95
        grandchild.update('barbells', 2, 2)
 
96
        self.assertEqual(self.top.child_fraction, 1)
 
97
 
 
98
    def test_implementations(self):
 
99
        for implementation in (TTYProgressBar, DotsProgressBar, 
 
100
                               DummyProgress):
 
101
            self.check_parent_handling(implementation)
 
102
 
 
103
    def check_parent_handling(self, parentclass):
 
104
        top = parentclass(to_file=StringIO())
 
105
        top.update('foobles', 1, 2)
 
106
        child = ChildProgress(_stack=FakeStack(top))
 
107
        child.update('baubles', 4, 4)
 
108
        top.update('lala', 2, 2)
 
109
        child.update('baubles', 4, 4)
 
110
 
 
111
    def test_stacking(self):
 
112
        self.check_stack(TTYProgressBar, ChildProgress)
 
113
        self.check_stack(DotsProgressBar, ChildProgress)
 
114
        self.check_stack(DummyProgress, DummyProgress)
 
115
 
 
116
    def check_stack(self, parent_class, child_class):
 
117
        stack = ProgressBarStack(klass=parent_class, to_file=StringIO())
 
118
        parent = stack.get_nested()
 
119
        try:
 
120
            self.assertIs(parent.__class__, parent_class)
 
121
            child = stack.get_nested()
 
122
            try:
 
123
                self.assertIs(child.__class__, child_class)
 
124
            finally:
 
125
                child.finished()
 
126
        finally:
 
127
            parent.finished()
 
128
 
 
129
    def test_throttling(self):
 
130
        pb = InstrumentedProgress(to_file=StringIO())
 
131
        # instantaneous updates should be squelched
 
132
        pb.update('me', 1, 1)
 
133
        self.assertTrue(pb.always_throttled)
 
134
        pb = InstrumentedProgress(to_file=StringIO())
 
135
        # It's like an instant sleep(1)!
 
136
        pb.start_time -= 1
 
137
        # Updates after a second should not be squelched
 
138
        pb.update('me', 1, 1)
 
139
        self.assertFalse(pb.always_throttled)
58
140
 
59
141
    def test_clear(self):
60
 
        # <https://bugs.launchpad.net/bzr/+bug/611127> clear must actually
61
 
        # send spaces to clear the line
62
 
        out, view = self.make_view()
63
 
        task = self.make_task(None, view, 'reticulating splines', 5, 20)
64
 
        view.show_progress(task)
65
 
        self.assertEqual(
66
 
'\r/ reticulating splines 5/20                                                    \r'
67
 
            , out.getvalue())
68
 
        view.clear()
69
 
        self.assertEqual(
70
 
'\r/ reticulating splines 5/20                                                    \r'
71
 
            + '\r' + 79 * ' ' + '\r',
72
 
            out.getvalue())
73
 
 
74
 
    def test_render_progress_no_bar(self):
75
 
        """The default view now has a spinner but no bar."""
76
 
        out, view = self.make_view()
77
 
        # view.enable_bar = False
78
 
        task = self.make_task(None, view, 'reticulating splines', 5, 20)
79
 
        view.show_progress(task)
80
 
        self.assertEqual(
81
 
'\r/ reticulating splines 5/20                                                    \r'
82
 
            , out.getvalue())
83
 
 
84
 
    def test_render_progress_easy(self):
85
 
        """Just one task and one quarter done"""
86
 
        out, view = self.make_view()
87
 
        view.enable_bar = True
88
 
        task = self.make_task(None, view, 'reticulating splines', 5, 20)
89
 
        view.show_progress(task)
90
 
        self.assertEqual(
91
 
'\r[####/               ] reticulating splines 5/20                               \r'
92
 
            , out.getvalue())
93
 
 
94
 
    def test_render_progress_nested(self):
95
 
        """Tasks proportionally contribute to overall progress"""
96
 
        out, view = self.make_view()
97
 
        task = self.make_task(None, view, 'reticulating splines', 0, 2)
98
 
        task2 = self.make_task(task, view, 'stage2', 1, 2)
99
 
        view.show_progress(task2)
100
 
        view.enable_bar = True
101
 
        # so we're in the first half of the main task, and half way through
102
 
        # that
103
 
        self.assertEqual(
104
 
'[####-               ] reticulating splines:stage2 1/2                         '
105
 
            , view._render_line())
106
 
        # if the nested task is complete, then we're all the way through the
107
 
        # first half of the overall work
108
 
        task2.update('stage2', 2, 2)
109
 
        self.assertEqual(
110
 
'[#########\          ] reticulating splines:stage2 2/2                         '
111
 
            , view._render_line())
112
 
 
113
 
    def test_render_progress_sub_nested(self):
114
 
        """Intermediate tasks don't mess up calculation."""
115
 
        out, view = self.make_view()
116
 
        view.enable_bar = True
117
 
        task_a = ProgressTask(None, progress_view=view)
118
 
        task_a.update('a', 0, 2)
119
 
        task_b = ProgressTask(task_a, progress_view=view)
120
 
        task_b.update('b')
121
 
        task_c = ProgressTask(task_b, progress_view=view)
122
 
        task_c.update('c', 1, 2)
123
 
        # the top-level task is in its first half; the middle one has no
124
 
        # progress indication, just a label; and the bottom one is half done,
125
 
        # so the overall fraction is 1/4
126
 
        self.assertEqual(
127
 
'[####|               ] a:b:c 1/2                                               '
128
 
            , view._render_line())
129
 
 
130
 
    def test_render_truncated(self):
131
 
        # when the bar is too long for the terminal, we prefer not to truncate
132
 
        # the counters because they might be interesting, and because
133
 
        # truncating the numbers might be misleading
134
 
        out, view = self.make_view()
135
 
        task_a = ProgressTask(None, progress_view=view)
136
 
        task_a.update('start_' + 'a' * 200 + '_end', 2000, 5000)
137
 
        line = view._render_line()
138
 
        self.assertEqual(
139
 
'- start_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.. 2000/5000',
140
 
           line) 
141
 
        self.assertEqual(len(line), 79)
142
 
 
143
 
 
144
 
    def test_render_with_activity(self):
145
 
        # if the progress view has activity, it's shown before the spinner
146
 
        out, view = self.make_view()
147
 
        task_a = ProgressTask(None, progress_view=view)
148
 
        view._last_transport_msg = '   123kB   100kB/s '
149
 
        line = view._render_line()
150
 
        self.assertEqual(
151
 
'   123kB   100kB/s /                                                           ',
152
 
           line) 
153
 
        self.assertEqual(len(line), 79)
154
 
 
155
 
        task_a.update('start_' + 'a' * 200 + '_end', 2000, 5000)
156
 
        view._last_transport_msg = '   123kB   100kB/s '
157
 
        line = view._render_line()
158
 
        self.assertEqual(
159
 
'   123kB   100kB/s \\ start_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.. 2000/5000',
160
 
           line) 
161
 
        self.assertEqual(len(line), 79)
162
 
 
163
 
    def test_render_progress_unicode_enc_utf8(self):
164
 
        out = tests.StringIOWrapper()
165
 
        out.encoding = "utf-8"
166
 
        view = self.make_view_only(out, 20)
167
 
        task = self.make_task(None, view, u"\xa7", 0, 1)
168
 
        view.show_progress(task)
169
 
        self.assertEqual('\r/ \xc2\xa7 0/1            \r',
170
 
            out.getvalue())
171
 
 
172
 
    def test_render_progress_unicode_enc_missing(self):
173
 
        out = StringIO()
174
 
        self.assertRaises(AttributeError, getattr, out, "encoding")
175
 
        view = self.make_view_only(out, 20)
176
 
        task = self.make_task(None, view, u"\xa7", 0, 1)
177
 
        view.show_progress(task)
178
 
        self.assertEqual('\r/ ? 0/1             \r',
179
 
            out.getvalue())
180
 
 
181
 
    def test_render_progress_unicode_enc_none(self):
182
 
        out = tests.StringIOWrapper()
183
 
        out.encoding = None
184
 
        view = self.make_view_only(out, 20)
185
 
        task = self.make_task(None, view, u"\xa7", 0, 1)
186
 
        view.show_progress(task)
187
 
        self.assertEqual('\r/ ? 0/1             \r',
188
 
            out.getvalue())
 
142
        sio = StringIO()
 
143
        pb = TTYProgressBar(to_file=sio, show_eta=False)
 
144
        pb.width = 20 # Just make it easier to test
 
145
        # This should not output anything
 
146
        pb.clear()
 
147
        # These two should not be displayed because
 
148
        # of throttling
 
149
        pb.update('foo', 1, 3)
 
150
        pb.update('bar', 2, 3)
 
151
        # So pb.clear() has nothing to do
 
152
        pb.clear()
 
153
 
 
154
        # Make sure the next update isn't throttled
 
155
        pb.start_time -= 1
 
156
        pb.update('baz', 3, 3)
 
157
        pb.clear()
 
158
 
 
159
        self.assertEqual('\r[=========] baz 3/3'
 
160
                         '\r                   \r',
 
161
                         sio.getvalue())
 
162
 
 
163
    def test_no_eta(self):
 
164
        # An old version of the progress bar would
 
165
        # store every update if show_eta was false
 
166
        # because the eta routine was where it was
 
167
        # cleaned out
 
168
        pb = InstrumentedProgress(to_file=StringIO(), show_eta=False)
 
169
        # Just make sure this first few are throttled
 
170
        pb.start_time += 5
 
171
 
 
172
        # These messages are throttled, and don't contribute
 
173
        for count in xrange(100):
 
174
            pb.update('x', count, 300)
 
175
        self.assertEqual(0, len(pb.last_updates))
 
176
 
 
177
        # Unthrottle by time
 
178
        pb.start_time -= 10
 
179
 
 
180
        # These happen too fast, so only one gets through
 
181
        for count in xrange(100):
 
182
            pb.update('x', count+100, 200)
 
183
        self.assertEqual(1, len(pb.last_updates))
 
184
 
 
185
        pb.MIN_PAUSE = 0.0
 
186
 
 
187
        # But all of these go through, don't let the
 
188
        # last_update list grow without bound
 
189
        for count in xrange(100):
 
190
            pb.update('x', count+100, 200)
 
191
 
 
192
        self.assertEqual(pb._max_last_updates, len(pb.last_updates))
 
193
 
 
194
 
 
195
class TestProgressTypes(TestCase):
 
196
    """Test that the right ProgressBar gets instantiated at the right time."""
 
197
 
 
198
    def get_nested(self, outf, term, env_progress=None):
 
199
        """Setup so that ProgressBar thinks we are in the supplied terminal."""
 
200
        orig_term = os.environ.get('TERM')
 
201
        orig_progress = os.environ.get('BZR_PROGRESS_BAR')
 
202
        os.environ['TERM'] = term
 
203
        if env_progress is not None:
 
204
            os.environ['BZR_PROGRESS_BAR'] = env_progress
 
205
        elif orig_progress is not None:
 
206
            del os.environ['BZR_PROGRESS_BAR']
 
207
 
 
208
        def reset():
 
209
            if orig_term is None:
 
210
                del os.environ['TERM']
 
211
            else:
 
212
                os.environ['TERM'] = orig_term
 
213
            # We may have never created BZR_PROGRESS_BAR
 
214
            # So we can't just delete like we can 'TERM' (which is always set)
 
215
            if orig_progress is None:
 
216
                if 'BZR_PROGRESS_BAR' in os.environ:
 
217
                    del os.environ['BZR_PROGRESS_BAR']
 
218
            else:
 
219
                os.environ['BZR_PROGRESS_BAR'] = orig_progress
 
220
 
 
221
        self.addCleanup(reset)
 
222
 
 
223
        stack = ProgressBarStack(to_file=outf)
 
224
        pb = stack.get_nested()
 
225
        pb.start_time -= 1 # Make sure it is ready to write
 
226
        pb.width = 20 # And it is of reasonable size
 
227
        return pb
 
228
 
 
229
    def test_tty_progress(self):
 
230
        # Make sure the ProgressBarStack thinks it is
 
231
        # writing out to a terminal, and thus uses a TTYProgressBar
 
232
        out = _TTYStringIO()
 
233
        pb = self.get_nested(out, 'xterm')
 
234
        self.assertIsInstance(pb, TTYProgressBar)
 
235
        try:
 
236
            pb.update('foo', 1, 2)
 
237
            pb.update('bar', 2, 2)
 
238
        finally:
 
239
            pb.finished()
 
240
 
 
241
        self.assertEqual('\r/ [====   ] foo 1/2'
 
242
                         '\r- [=======] bar 2/2'
 
243
                         '\r                   \r',
 
244
                         out.getvalue())
 
245
 
 
246
    def test_dots_progress(self):
 
247
        # Make sure the ProgressBarStack thinks it is
 
248
        # not writing out to a terminal, and thus uses a 
 
249
        # DotsProgressBar
 
250
        out = _NonTTYStringIO()
 
251
        pb = self.get_nested(out, 'xterm')
 
252
        self.assertIsInstance(pb, DotsProgressBar)
 
253
        try:
 
254
            pb.update('foo', 1, 2)
 
255
            pb.update('bar', 2, 2)
 
256
        finally:
 
257
            pb.finished()
 
258
 
 
259
        self.assertEqual('foo: .'
 
260
                         '\nbar: .'
 
261
                         '\n',
 
262
                         out.getvalue())
 
263
 
 
264
    def test_no_isatty_progress(self):
 
265
        # Make sure ProgressBarStack handles a plain StringIO()
 
266
        import cStringIO
 
267
        out = cStringIO.StringIO()
 
268
        pb = self.get_nested(out, 'xterm')
 
269
        pb.finished()
 
270
        self.assertIsInstance(pb, DotsProgressBar)
 
271
 
 
272
    def test_dumb_progress(self):
 
273
        # Make sure the ProgressBarStack thinks it is writing out to a 
 
274
        # terminal, but it is the emacs 'dumb' terminal, so it uses
 
275
        # Dots
 
276
        out = _TTYStringIO()
 
277
        pb = self.get_nested(out, 'dumb')
 
278
        pb.finished()
 
279
        self.assertIsInstance(pb, DotsProgressBar)
 
280
 
 
281
    def test_progress_env_tty(self):
 
282
        # The environ variable BZR_PROGRESS_BAR controls what type of
 
283
        # progress bar we will get, even if it wouldn't usually be that type
 
284
        import cStringIO
 
285
 
 
286
        # Usually, this would be a DotsProgressBar
 
287
        out = cStringIO.StringIO()
 
288
        pb = self.get_nested(out, 'dumb', 'tty')
 
289
        pb.finished()
 
290
        # Even though we are not a tty, the env_var will override
 
291
        self.assertIsInstance(pb, TTYProgressBar)
 
292
 
 
293
    def test_progress_env_dots(self):
 
294
        # Even though we are in a tty, the env_var will override
 
295
        out = _TTYStringIO()
 
296
        pb = self.get_nested(out, 'xterm', 'dots')
 
297
        pb.finished()
 
298
        self.assertIsInstance(pb, DotsProgressBar)
 
299
 
 
300
    def test_progress_env_none(self):
 
301
        # Even though we are in a valid tty, no progress
 
302
        out = _TTYStringIO()
 
303
        pb = self.get_nested(out, 'xterm', 'none')
 
304
        pb.finished()
 
305
        self.assertIsInstance(pb, DummyProgress)
 
306
 
 
307
    def test_progress_env_invalid(self):
 
308
        out = _TTYStringIO()
 
309
        self.assertRaises(errors.InvalidProgressBarType, self.get_nested,
 
310
            out, 'xterm', 'nonexistant')