~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_rio.py

[merge] robert's knit-performance work

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2009, 2010, 2011 Canonical Ltd
 
1
# Copyright (C) 2005, 2006 by Canonical Ltd
2
2
#
3
3
# This program is free software; you can redistribute it and/or modify
4
4
# it under the terms of the GNU General Public License as published by
12
12
#
13
13
# You should have received a copy of the GNU General Public License
14
14
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
16
 
17
17
"""Tests for rio serialization
18
18
 
22
22
but this depends on the transport.
23
23
"""
24
24
 
25
 
import re
 
25
import cStringIO
 
26
import os
 
27
import sys
26
28
from tempfile import TemporaryFile
27
29
 
28
 
from bzrlib import (
29
 
    rio,
30
 
    )
31
 
from bzrlib.tests import TestCase
32
 
from bzrlib.rio import (
33
 
    RioReader,
34
 
    Stanza,
35
 
    read_stanza,
36
 
    read_stanzas,
37
 
    rio_file,
38
 
    )
 
30
from bzrlib.tests import TestCaseInTempDir, TestCase
 
31
from bzrlib.rio import RioWriter, Stanza, read_stanza, read_stanzas
39
32
 
40
33
 
41
34
class TestRio(TestCase):
56
49
        # these aren't enforced at construction time
57
50
        ## self.assertRaises(ValueError,
58
51
        ##        Stanza, complex=42 + 3j)
59
 
        ## self.assertRaises(ValueError,
 
52
        ## self.assertRaises(ValueError, 
60
53
        ##        Stanza, several=range(10))
61
54
 
62
55
    def test_empty_value(self):
125
118
    def test_repeated_field(self):
126
119
        """Repeated field in rio"""
127
120
        s = Stanza()
128
 
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'),
 
121
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'), 
129
122
                     ('a', '1000'), ('b', '2000')]:
130
123
            s.add(k, v)
131
124
        s2 = read_stanza(s.to_lines())
143
136
    def test_blank_line(self):
144
137
        s = Stanza(none='', one='\n', two='\n\n')
145
138
        self.assertEqualDiff(s.to_string(), """\
146
 
none:\x20
147
 
one:\x20
 
139
none: 
 
140
one: 
148
141
\t
149
 
two:\x20
 
142
two: 
150
143
\t
151
144
\t
152
145
""")
156
149
    def test_whitespace_value(self):
157
150
        s = Stanza(space=' ', tabs='\t\t\t', combo='\n\t\t\n')
158
151
        self.assertEqualDiff(s.to_string(), """\
159
 
combo:\x20
 
152
combo: 
160
153
\t\t\t
161
154
\t
162
 
space:\x20\x20
 
155
space:  
163
156
tabs: \t\t\t
164
157
""")
165
158
        s2 = read_stanza(s.to_lines())
166
159
        self.assertEquals(s, s2)
167
 
        self.rio_file_stanzas([s])
168
160
 
169
161
    def test_quoted(self):
170
162
        """rio quoted string cases"""
171
 
        s = Stanza(q1='"hello"',
172
 
                   q2=' "for',
 
163
        s = Stanza(q1='"hello"', 
 
164
                   q2=' "for', 
173
165
                   q3='\n\n"for"\n',
174
166
                   q4='for\n"\nfor',
175
167
                   q5='\n',
176
 
                   q6='"',
 
168
                   q6='"', 
177
169
                   q7='""',
178
170
                   q8='\\',
179
171
                   q9='\\"\\"',
180
172
                   )
181
173
        s2 = read_stanza(s.to_lines())
182
174
        self.assertEquals(s, s2)
183
 
        # apparent bug in read_stanza
184
 
        # s3 = read_stanza(self.stanzas_to_str([s]))
185
 
        # self.assertEquals(s, s3)
186
175
 
187
176
    def test_read_empty(self):
188
177
        """Detect end of rio file"""
189
178
        s = read_stanza([])
190
179
        self.assertEqual(s, None)
191
180
        self.assertTrue(s is None)
192
 
 
193
 
    def test_read_nul_byte(self):
194
 
        """File consisting of a nul byte causes an error."""
195
 
        self.assertRaises(ValueError, read_stanza, ['\0'])
196
 
 
197
 
    def test_read_nul_bytes(self):
198
 
        """File consisting of many nul bytes causes an error."""
199
 
        self.assertRaises(ValueError, read_stanza, ['\0' * 100])
200
 
 
 
181
        
201
182
    def test_read_iter(self):
202
183
        """Read several stanzas from file"""
203
184
        tmpf = TemporaryFile()
214
195
        reader = read_stanzas(tmpf)
215
196
        read_iter = iter(reader)
216
197
        stuff = list(reader)
217
 
        self.assertEqual(stuff,
 
198
        self.assertEqual(stuff, 
218
199
                [ Stanza(version_header='1'),
219
200
                  Stanza(name="foo", val='123'),
220
201
                  Stanza(name="bar", val='129319'), ])
248
229
        self.assertEquals(s, Stanza(name="bar", val='129319'))
249
230
        s = read_stanza(tmpf)
250
231
        self.assertEquals(s, None)
251
 
        self.check_rio_file(tmpf)
252
 
 
253
 
    def check_rio_file(self, real_file):
254
 
        real_file.seek(0)
255
 
        read_write = rio_file(RioReader(real_file)).read()
256
 
        real_file.seek(0)
257
 
        self.assertEquals(read_write, real_file.read())
258
 
 
259
 
    @staticmethod
260
 
    def stanzas_to_str(stanzas):
261
 
        return rio_file(stanzas).read()
262
 
 
263
 
    def rio_file_stanzas(self, stanzas):
264
 
        new_stanzas = list(RioReader(rio_file(stanzas)))
265
 
        self.assertEqual(new_stanzas, stanzas)
266
232
 
267
233
    def test_tricky_quoted(self):
268
234
        tmpf = TemporaryFile()
269
235
        tmpf.write('''\
270
236
s: "one"
271
237
 
272
 
s:\x20
 
238
s: 
273
239
\t"one"
274
240
\t
275
241
 
279
245
 
280
246
s: """
281
247
 
282
 
s:\x20
 
248
s: 
283
249
\t
284
250
 
285
251
s: \\
286
252
 
287
 
s:\x20
 
253
s: 
288
254
\t\\
289
255
\t\\\\
290
256
\t
314
280
            ]
315
281
        for expected in expected_vals:
316
282
            stanza = read_stanza(tmpf)
317
 
            self.rio_file_stanzas([stanza])
318
283
            self.assertEquals(len(stanza), 1)
319
284
            self.assertEqualDiff(stanza.get('s'), expected)
320
285
 
334
299
        self.assertRaises(TypeError, s.add, 10, {})
335
300
 
336
301
    def test_rio_unicode(self):
 
302
        # intentionally use cStringIO which doesn't accomodate unencoded unicode objects
 
303
        sio = cStringIO.StringIO()
337
304
        uni_data = u'\N{KATAKANA LETTER O}'
338
305
        s = Stanza(foo=uni_data)
339
306
        self.assertEquals(s.get('foo'), uni_data)
343
310
        new_s = read_stanza(raw_lines)
344
311
        self.assertEquals(new_s.get('foo'), uni_data)
345
312
 
346
 
    def test_rio_to_unicode(self):
347
 
        uni_data = u'\N{KATAKANA LETTER O}'
348
 
        s = Stanza(foo=uni_data)
349
 
        unicode_str = s.to_unicode()
350
 
        self.assertEqual(u'foo: %s\n' % (uni_data,), unicode_str)
351
 
        new_s = rio.read_stanza_unicode(unicode_str.splitlines(True))
352
 
        self.assertEqual(uni_data, new_s.get('foo'))
353
 
 
354
 
    def test_nested_rio_unicode(self):
355
 
        uni_data = u'\N{KATAKANA LETTER O}'
356
 
        s = Stanza(foo=uni_data)
357
 
        parent_stanza = Stanza(child=s.to_unicode())
358
 
        raw_lines = parent_stanza.to_lines()
359
 
        self.assertEqual(['child: foo: ' + uni_data.encode('utf-8') + '\n',
360
 
                          '\t\n',
361
 
                         ], raw_lines)
362
 
        new_parent = read_stanza(raw_lines)
363
 
        child_text = new_parent.get('child')
364
 
        self.assertEqual(u'foo: %s\n' % uni_data, child_text)
365
 
        new_child = rio.read_stanza_unicode(child_text.splitlines(True))
366
 
        self.assertEqual(uni_data, new_child.get('foo'))
367
 
 
368
 
    def mail_munge(self, lines, dos_nl=True):
369
 
        new_lines = []
370
 
        for line in lines:
371
 
            line = re.sub(' *\n', '\n', line)
372
 
            if dos_nl:
373
 
                line = re.sub('([^\r])\n', '\\1\r\n', line)
374
 
            new_lines.append(line)
375
 
        return new_lines
376
 
 
377
 
    def test_patch_rio(self):
378
 
        stanza = Stanza(data='#\n\r\\r ', space=' ' * 255, hash='#' * 255)
379
 
        lines = rio.to_patch_lines(stanza)
380
 
        for line in lines:
381
 
            self.assertContainsRe(line, '^# ')
382
 
            self.assertTrue(72 >= len(line))
383
 
        for line in rio.to_patch_lines(stanza, max_width=12):
384
 
            self.assertTrue(12 >= len(line))
385
 
        new_stanza = rio.read_patch_stanza(self.mail_munge(lines,
386
 
                                                           dos_nl=False))
387
 
        lines = self.mail_munge(lines)
388
 
        new_stanza = rio.read_patch_stanza(lines)
389
 
        self.assertEqual('#\n\r\\r ', new_stanza.get('data'))
390
 
        self.assertEqual(' '* 255, new_stanza.get('space'))
391
 
        self.assertEqual('#'* 255, new_stanza.get('hash'))
392
 
 
393
 
    def test_patch_rio_linebreaks(self):
394
 
        stanza = Stanza(breaktest='linebreak -/'*30)
395
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 71)[0],
396
 
                              'linebreak\\\\\n')
397
 
        stanza = Stanza(breaktest='linebreak-/'*30)
398
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
399
 
                              'linebreak-\\\\\n')
400
 
        stanza = Stanza(breaktest='linebreak/'*30)
401
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
402
 
                              'linebreak\\\\\n')