~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_rio.py

new encoder, allows non monotonically increasing sequence matches for moar compression.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006, 2007, 2009, 2010, 2011 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
"""Tests for rio serialization
18
 
 
19
 
A simple, reproducible structured IO format.
20
 
 
21
 
rio itself works in Unicode strings.  It is typically encoded to UTF-8,
22
 
but this depends on the transport.
23
 
"""
24
 
 
25
 
import re
26
 
from tempfile import TemporaryFile
27
 
 
28
 
from bzrlib import (
29
 
    rio,
30
 
    )
31
 
from bzrlib.tests import TestCase
32
 
from bzrlib.rio import (
33
 
    RioReader,
34
 
    Stanza,
35
 
    read_stanza,
36
 
    read_stanzas,
37
 
    rio_file,
38
 
    )
39
 
 
40
 
 
41
 
class TestRio(TestCase):
42
 
 
43
 
    def test_stanza(self):
44
 
        """Construct rio stanza in memory"""
45
 
        s = Stanza(number='42', name="fred")
46
 
        self.assertTrue('number' in s)
47
 
        self.assertFalse('color' in s)
48
 
        self.assertFalse('42' in s)
49
 
        self.assertEquals(list(s.iter_pairs()),
50
 
                [('name', 'fred'), ('number', '42')])
51
 
        self.assertEquals(s.get('number'), '42')
52
 
        self.assertEquals(s.get('name'), 'fred')
53
 
 
54
 
    def test_value_checks(self):
55
 
        """rio checks types on construction"""
56
 
        # these aren't enforced at construction time
57
 
        ## self.assertRaises(ValueError,
58
 
        ##        Stanza, complex=42 + 3j)
59
 
        ## self.assertRaises(ValueError,
60
 
        ##        Stanza, several=range(10))
61
 
 
62
 
    def test_empty_value(self):
63
 
        """Serialize stanza with empty field"""
64
 
        s = Stanza(empty='')
65
 
        self.assertEqualDiff(s.to_string(),
66
 
                "empty: \n")
67
 
 
68
 
    def test_to_lines(self):
69
 
        """Write simple rio stanza to string"""
70
 
        s = Stanza(number='42', name='fred')
71
 
        self.assertEquals(list(s.to_lines()),
72
 
                ['name: fred\n',
73
 
                 'number: 42\n'])
74
 
 
75
 
    def test_as_dict(self):
76
 
        """Convert rio Stanza to dictionary"""
77
 
        s = Stanza(number='42', name='fred')
78
 
        sd = s.as_dict()
79
 
        self.assertEquals(sd, dict(number='42', name='fred'))
80
 
 
81
 
    def test_to_file(self):
82
 
        """Write rio to file"""
83
 
        tmpf = TemporaryFile()
84
 
        s = Stanza(a_thing='something with "quotes like \\"this\\""', number='42', name='fred')
85
 
        s.write(tmpf)
86
 
        tmpf.seek(0)
87
 
        self.assertEqualDiff(tmpf.read(), r'''
88
 
a_thing: something with "quotes like \"this\""
89
 
name: fred
90
 
number: 42
91
 
'''[1:])
92
 
 
93
 
    def test_multiline_string(self):
94
 
        tmpf = TemporaryFile()
95
 
        s = Stanza(motto="war is peace\nfreedom is slavery\nignorance is strength")
96
 
        s.write(tmpf)
97
 
        tmpf.seek(0)
98
 
        self.assertEqualDiff(tmpf.read(), '''\
99
 
motto: war is peace
100
 
\tfreedom is slavery
101
 
\tignorance is strength
102
 
''')
103
 
        tmpf.seek(0)
104
 
        s2 = read_stanza(tmpf)
105
 
        self.assertEquals(s, s2)
106
 
 
107
 
    def test_read_stanza(self):
108
 
        """Load stanza from string"""
109
 
        lines = """\
110
 
revision: mbp@sourcefrog.net-123-abc
111
 
timestamp: 1130653962
112
 
timezone: 36000
113
 
committer: Martin Pool <mbp@test.sourcefrog.net>
114
 
""".splitlines(True)
115
 
        s = read_stanza(lines)
116
 
        self.assertTrue('revision' in s)
117
 
        self.assertEqualDiff(s.get('revision'), 'mbp@sourcefrog.net-123-abc')
118
 
        self.assertEquals(list(s.iter_pairs()),
119
 
                [('revision', 'mbp@sourcefrog.net-123-abc'),
120
 
                 ('timestamp', '1130653962'),
121
 
                 ('timezone', '36000'),
122
 
                 ('committer', "Martin Pool <mbp@test.sourcefrog.net>")])
123
 
        self.assertEquals(len(s), 4)
124
 
 
125
 
    def test_repeated_field(self):
126
 
        """Repeated field in rio"""
127
 
        s = Stanza()
128
 
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'),
129
 
                     ('a', '1000'), ('b', '2000')]:
130
 
            s.add(k, v)
131
 
        s2 = read_stanza(s.to_lines())
132
 
        self.assertEquals(s, s2)
133
 
        self.assertEquals(s.get_all('a'), map(str, [10, 100, 1000]))
134
 
        self.assertEquals(s.get_all('b'), map(str, [20, 200, 2000]))
135
 
 
136
 
    def test_backslash(self):
137
 
        s = Stanza(q='\\')
138
 
        t = s.to_string()
139
 
        self.assertEqualDiff(t, 'q: \\\n')
140
 
        s2 = read_stanza(s.to_lines())
141
 
        self.assertEquals(s, s2)
142
 
 
143
 
    def test_blank_line(self):
144
 
        s = Stanza(none='', one='\n', two='\n\n')
145
 
        self.assertEqualDiff(s.to_string(), """\
146
 
none:\x20
147
 
one:\x20
148
 
\t
149
 
two:\x20
150
 
\t
151
 
\t
152
 
""")
153
 
        s2 = read_stanza(s.to_lines())
154
 
        self.assertEquals(s, s2)
155
 
 
156
 
    def test_whitespace_value(self):
157
 
        s = Stanza(space=' ', tabs='\t\t\t', combo='\n\t\t\n')
158
 
        self.assertEqualDiff(s.to_string(), """\
159
 
combo:\x20
160
 
\t\t\t
161
 
\t
162
 
space:\x20\x20
163
 
tabs: \t\t\t
164
 
""")
165
 
        s2 = read_stanza(s.to_lines())
166
 
        self.assertEquals(s, s2)
167
 
        self.rio_file_stanzas([s])
168
 
 
169
 
    def test_quoted(self):
170
 
        """rio quoted string cases"""
171
 
        s = Stanza(q1='"hello"',
172
 
                   q2=' "for',
173
 
                   q3='\n\n"for"\n',
174
 
                   q4='for\n"\nfor',
175
 
                   q5='\n',
176
 
                   q6='"',
177
 
                   q7='""',
178
 
                   q8='\\',
179
 
                   q9='\\"\\"',
180
 
                   )
181
 
        s2 = read_stanza(s.to_lines())
182
 
        self.assertEquals(s, s2)
183
 
        # apparent bug in read_stanza
184
 
        # s3 = read_stanza(self.stanzas_to_str([s]))
185
 
        # self.assertEquals(s, s3)
186
 
 
187
 
    def test_read_empty(self):
188
 
        """Detect end of rio file"""
189
 
        s = read_stanza([])
190
 
        self.assertEqual(s, None)
191
 
        self.assertTrue(s is None)
192
 
 
193
 
    def test_read_nul_byte(self):
194
 
        """File consisting of a nul byte causes an error."""
195
 
        self.assertRaises(ValueError, read_stanza, ['\0'])
196
 
 
197
 
    def test_read_nul_bytes(self):
198
 
        """File consisting of many nul bytes causes an error."""
199
 
        self.assertRaises(ValueError, read_stanza, ['\0' * 100])
200
 
 
201
 
    def test_read_iter(self):
202
 
        """Read several stanzas from file"""
203
 
        tmpf = TemporaryFile()
204
 
        tmpf.write("""\
205
 
version_header: 1
206
 
 
207
 
name: foo
208
 
val: 123
209
 
 
210
 
name: bar
211
 
val: 129319
212
 
""")
213
 
        tmpf.seek(0)
214
 
        reader = read_stanzas(tmpf)
215
 
        read_iter = iter(reader)
216
 
        stuff = list(reader)
217
 
        self.assertEqual(stuff,
218
 
                [ Stanza(version_header='1'),
219
 
                  Stanza(name="foo", val='123'),
220
 
                  Stanza(name="bar", val='129319'), ])
221
 
 
222
 
    def test_read_several(self):
223
 
        """Read several stanzas from file"""
224
 
        tmpf = TemporaryFile()
225
 
        tmpf.write("""\
226
 
version_header: 1
227
 
 
228
 
name: foo
229
 
val: 123
230
 
 
231
 
name: quoted
232
 
address:   "Willowglen"
233
 
\t  42 Wallaby Way
234
 
\t  Sydney
235
 
 
236
 
name: bar
237
 
val: 129319
238
 
""")
239
 
        tmpf.seek(0)
240
 
        s = read_stanza(tmpf)
241
 
        self.assertEquals(s, Stanza(version_header='1'))
242
 
        s = read_stanza(tmpf)
243
 
        self.assertEquals(s, Stanza(name="foo", val='123'))
244
 
        s = read_stanza(tmpf)
245
 
        self.assertEqualDiff(s.get('name'), 'quoted')
246
 
        self.assertEqualDiff(s.get('address'), '  "Willowglen"\n  42 Wallaby Way\n  Sydney')
247
 
        s = read_stanza(tmpf)
248
 
        self.assertEquals(s, Stanza(name="bar", val='129319'))
249
 
        s = read_stanza(tmpf)
250
 
        self.assertEquals(s, None)
251
 
        self.check_rio_file(tmpf)
252
 
 
253
 
    def check_rio_file(self, real_file):
254
 
        real_file.seek(0)
255
 
        read_write = rio_file(RioReader(real_file)).read()
256
 
        real_file.seek(0)
257
 
        self.assertEquals(read_write, real_file.read())
258
 
 
259
 
    @staticmethod
260
 
    def stanzas_to_str(stanzas):
261
 
        return rio_file(stanzas).read()
262
 
 
263
 
    def rio_file_stanzas(self, stanzas):
264
 
        new_stanzas = list(RioReader(rio_file(stanzas)))
265
 
        self.assertEqual(new_stanzas, stanzas)
266
 
 
267
 
    def test_tricky_quoted(self):
268
 
        tmpf = TemporaryFile()
269
 
        tmpf.write('''\
270
 
s: "one"
271
 
 
272
 
s:\x20
273
 
\t"one"
274
 
\t
275
 
 
276
 
s: "
277
 
 
278
 
s: ""
279
 
 
280
 
s: """
281
 
 
282
 
s:\x20
283
 
\t
284
 
 
285
 
s: \\
286
 
 
287
 
s:\x20
288
 
\t\\
289
 
\t\\\\
290
 
\t
291
 
 
292
 
s: word\\
293
 
 
294
 
s: quote"
295
 
 
296
 
s: backslashes\\\\\\
297
 
 
298
 
s: both\\\"
299
 
 
300
 
''')
301
 
        tmpf.seek(0)
302
 
        expected_vals = ['"one"',
303
 
            '\n"one"\n',
304
 
            '"',
305
 
            '""',
306
 
            '"""',
307
 
            '\n',
308
 
            '\\',
309
 
            '\n\\\n\\\\\n',
310
 
            'word\\',
311
 
            'quote\"',
312
 
            'backslashes\\\\\\',
313
 
            'both\\\"',
314
 
            ]
315
 
        for expected in expected_vals:
316
 
            stanza = read_stanza(tmpf)
317
 
            self.rio_file_stanzas([stanza])
318
 
            self.assertEquals(len(stanza), 1)
319
 
            self.assertEqualDiff(stanza.get('s'), expected)
320
 
 
321
 
    def test_write_empty_stanza(self):
322
 
        """Write empty stanza"""
323
 
        l = list(Stanza().to_lines())
324
 
        self.assertEquals(l, [])
325
 
 
326
 
    def test_rio_raises_type_error(self):
327
 
        """TypeError on adding invalid type to Stanza"""
328
 
        s = Stanza()
329
 
        self.assertRaises(TypeError, s.add, 'foo', {})
330
 
 
331
 
    def test_rio_raises_type_error_key(self):
332
 
        """TypeError on adding invalid type to Stanza"""
333
 
        s = Stanza()
334
 
        self.assertRaises(TypeError, s.add, 10, {})
335
 
 
336
 
    def test_rio_unicode(self):
337
 
        uni_data = u'\N{KATAKANA LETTER O}'
338
 
        s = Stanza(foo=uni_data)
339
 
        self.assertEquals(s.get('foo'), uni_data)
340
 
        raw_lines = s.to_lines()
341
 
        self.assertEquals(raw_lines,
342
 
                ['foo: ' + uni_data.encode('utf-8') + '\n'])
343
 
        new_s = read_stanza(raw_lines)
344
 
        self.assertEquals(new_s.get('foo'), uni_data)
345
 
 
346
 
    def test_rio_to_unicode(self):
347
 
        uni_data = u'\N{KATAKANA LETTER O}'
348
 
        s = Stanza(foo=uni_data)
349
 
        unicode_str = s.to_unicode()
350
 
        self.assertEqual(u'foo: %s\n' % (uni_data,), unicode_str)
351
 
        new_s = rio.read_stanza_unicode(unicode_str.splitlines(True))
352
 
        self.assertEqual(uni_data, new_s.get('foo'))
353
 
 
354
 
    def test_nested_rio_unicode(self):
355
 
        uni_data = u'\N{KATAKANA LETTER O}'
356
 
        s = Stanza(foo=uni_data)
357
 
        parent_stanza = Stanza(child=s.to_unicode())
358
 
        raw_lines = parent_stanza.to_lines()
359
 
        self.assertEqual(['child: foo: ' + uni_data.encode('utf-8') + '\n',
360
 
                          '\t\n',
361
 
                         ], raw_lines)
362
 
        new_parent = read_stanza(raw_lines)
363
 
        child_text = new_parent.get('child')
364
 
        self.assertEqual(u'foo: %s\n' % uni_data, child_text)
365
 
        new_child = rio.read_stanza_unicode(child_text.splitlines(True))
366
 
        self.assertEqual(uni_data, new_child.get('foo'))
367
 
 
368
 
    def mail_munge(self, lines, dos_nl=True):
369
 
        new_lines = []
370
 
        for line in lines:
371
 
            line = re.sub(' *\n', '\n', line)
372
 
            if dos_nl:
373
 
                line = re.sub('([^\r])\n', '\\1\r\n', line)
374
 
            new_lines.append(line)
375
 
        return new_lines
376
 
 
377
 
    def test_patch_rio(self):
378
 
        stanza = Stanza(data='#\n\r\\r ', space=' ' * 255, hash='#' * 255)
379
 
        lines = rio.to_patch_lines(stanza)
380
 
        for line in lines:
381
 
            self.assertContainsRe(line, '^# ')
382
 
            self.assertTrue(72 >= len(line))
383
 
        for line in rio.to_patch_lines(stanza, max_width=12):
384
 
            self.assertTrue(12 >= len(line))
385
 
        new_stanza = rio.read_patch_stanza(self.mail_munge(lines,
386
 
                                                           dos_nl=False))
387
 
        lines = self.mail_munge(lines)
388
 
        new_stanza = rio.read_patch_stanza(lines)
389
 
        self.assertEqual('#\n\r\\r ', new_stanza.get('data'))
390
 
        self.assertEqual(' '* 255, new_stanza.get('space'))
391
 
        self.assertEqual('#'* 255, new_stanza.get('hash'))
392
 
 
393
 
    def test_patch_rio_linebreaks(self):
394
 
        stanza = Stanza(breaktest='linebreak -/'*30)
395
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 71)[0],
396
 
                              'linebreak\\\\\n')
397
 
        stanza = Stanza(breaktest='linebreak-/'*30)
398
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
399
 
                              'linebreak-\\\\\n')
400
 
        stanza = Stanza(breaktest='linebreak/'*30)
401
 
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
402
 
                              'linebreak\\\\\n')