~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_rio.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-11-03 23:02:16 UTC
  • mfrom: (2951.1.1 pack)
  • Revision ID: pqm@pqm.ubuntu.com-20071103230216-mnmwuxm413lyhjdv
(robertc) Fix data-refresh logic for packs not to refresh mid-transaction when a names write lock is held. (Robert Collins)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for rio serialization
 
18
 
 
19
A simple, reproducible structured IO format.
 
20
 
 
21
rio itself works in Unicode strings.  It is typically encoded to UTF-8,
 
22
but this depends on the transport.
 
23
"""
 
24
 
 
25
import cStringIO
 
26
import os
 
27
import re
 
28
import sys
 
29
from tempfile import TemporaryFile
 
30
 
 
31
from bzrlib import (
 
32
    rio,
 
33
    )
 
34
from bzrlib.tests import TestCaseInTempDir, TestCase
 
35
from bzrlib.rio import (RioWriter, Stanza, read_stanza, read_stanzas, rio_file,
 
36
                        RioReader)
 
37
 
 
38
 
 
39
class TestRio(TestCase):
 
40
 
 
41
    def test_stanza(self):
 
42
        """Construct rio stanza in memory"""
 
43
        s = Stanza(number='42', name="fred")
 
44
        self.assertTrue('number' in s)
 
45
        self.assertFalse('color' in s)
 
46
        self.assertFalse('42' in s)
 
47
        self.assertEquals(list(s.iter_pairs()),
 
48
                [('name', 'fred'), ('number', '42')])
 
49
        self.assertEquals(s.get('number'), '42')
 
50
        self.assertEquals(s.get('name'), 'fred')
 
51
 
 
52
    def test_value_checks(self):
 
53
        """rio checks types on construction"""
 
54
        # these aren't enforced at construction time
 
55
        ## self.assertRaises(ValueError,
 
56
        ##        Stanza, complex=42 + 3j)
 
57
        ## self.assertRaises(ValueError, 
 
58
        ##        Stanza, several=range(10))
 
59
 
 
60
    def test_empty_value(self):
 
61
        """Serialize stanza with empty field"""
 
62
        s = Stanza(empty='')
 
63
        self.assertEqualDiff(s.to_string(),
 
64
                "empty: \n")
 
65
 
 
66
    def test_to_lines(self):
 
67
        """Write simple rio stanza to string"""
 
68
        s = Stanza(number='42', name='fred')
 
69
        self.assertEquals(list(s.to_lines()),
 
70
                ['name: fred\n',
 
71
                 'number: 42\n'])
 
72
 
 
73
    def test_as_dict(self):
 
74
        """Convert rio Stanza to dictionary"""
 
75
        s = Stanza(number='42', name='fred')
 
76
        sd = s.as_dict()
 
77
        self.assertEquals(sd, dict(number='42', name='fred'))
 
78
 
 
79
    def test_to_file(self):
 
80
        """Write rio to file"""
 
81
        tmpf = TemporaryFile()
 
82
        s = Stanza(a_thing='something with "quotes like \\"this\\""', number='42', name='fred')
 
83
        s.write(tmpf)
 
84
        tmpf.seek(0)
 
85
        self.assertEqualDiff(tmpf.read(), r'''
 
86
a_thing: something with "quotes like \"this\""
 
87
name: fred
 
88
number: 42
 
89
'''[1:])
 
90
 
 
91
    def test_multiline_string(self):
 
92
        tmpf = TemporaryFile()
 
93
        s = Stanza(motto="war is peace\nfreedom is slavery\nignorance is strength")
 
94
        s.write(tmpf)
 
95
        tmpf.seek(0)
 
96
        self.assertEqualDiff(tmpf.read(), '''\
 
97
motto: war is peace
 
98
\tfreedom is slavery
 
99
\tignorance is strength
 
100
''')
 
101
        tmpf.seek(0)
 
102
        s2 = read_stanza(tmpf)
 
103
        self.assertEquals(s, s2)
 
104
 
 
105
    def test_read_stanza(self):
 
106
        """Load stanza from string"""
 
107
        lines = """\
 
108
revision: mbp@sourcefrog.net-123-abc
 
109
timestamp: 1130653962
 
110
timezone: 36000
 
111
committer: Martin Pool <mbp@test.sourcefrog.net>
 
112
""".splitlines(True)
 
113
        s = read_stanza(lines)
 
114
        self.assertTrue('revision' in s)
 
115
        self.assertEqualDiff(s.get('revision'), 'mbp@sourcefrog.net-123-abc')
 
116
        self.assertEquals(list(s.iter_pairs()),
 
117
                [('revision', 'mbp@sourcefrog.net-123-abc'),
 
118
                 ('timestamp', '1130653962'),
 
119
                 ('timezone', '36000'),
 
120
                 ('committer', "Martin Pool <mbp@test.sourcefrog.net>")])
 
121
        self.assertEquals(len(s), 4)
 
122
 
 
123
    def test_repeated_field(self):
 
124
        """Repeated field in rio"""
 
125
        s = Stanza()
 
126
        for k, v in [('a', '10'), ('b', '20'), ('a', '100'), ('b', '200'), 
 
127
                     ('a', '1000'), ('b', '2000')]:
 
128
            s.add(k, v)
 
129
        s2 = read_stanza(s.to_lines())
 
130
        self.assertEquals(s, s2)
 
131
        self.assertEquals(s.get_all('a'), map(str, [10, 100, 1000]))
 
132
        self.assertEquals(s.get_all('b'), map(str, [20, 200, 2000]))
 
133
 
 
134
    def test_backslash(self):
 
135
        s = Stanza(q='\\')
 
136
        t = s.to_string()
 
137
        self.assertEqualDiff(t, 'q: \\\n')
 
138
        s2 = read_stanza(s.to_lines())
 
139
        self.assertEquals(s, s2)
 
140
 
 
141
    def test_blank_line(self):
 
142
        s = Stanza(none='', one='\n', two='\n\n')
 
143
        self.assertEqualDiff(s.to_string(), """\
 
144
none: 
 
145
one: 
 
146
\t
 
147
two: 
 
148
\t
 
149
\t
 
150
""")
 
151
        s2 = read_stanza(s.to_lines())
 
152
        self.assertEquals(s, s2)
 
153
 
 
154
    def test_whitespace_value(self):
 
155
        s = Stanza(space=' ', tabs='\t\t\t', combo='\n\t\t\n')
 
156
        self.assertEqualDiff(s.to_string(), """\
 
157
combo: 
 
158
\t\t\t
 
159
\t
 
160
space:  
 
161
tabs: \t\t\t
 
162
""")
 
163
        s2 = read_stanza(s.to_lines())
 
164
        self.assertEquals(s, s2)
 
165
        self.rio_file_stanzas([s])
 
166
 
 
167
    def test_quoted(self):
 
168
        """rio quoted string cases"""
 
169
        s = Stanza(q1='"hello"', 
 
170
                   q2=' "for', 
 
171
                   q3='\n\n"for"\n',
 
172
                   q4='for\n"\nfor',
 
173
                   q5='\n',
 
174
                   q6='"', 
 
175
                   q7='""',
 
176
                   q8='\\',
 
177
                   q9='\\"\\"',
 
178
                   )
 
179
        s2 = read_stanza(s.to_lines())
 
180
        self.assertEquals(s, s2)
 
181
        # apparent bug in read_stanza
 
182
        # s3 = read_stanza(self.stanzas_to_str([s]))
 
183
        # self.assertEquals(s, s3)
 
184
 
 
185
    def test_read_empty(self):
 
186
        """Detect end of rio file"""
 
187
        s = read_stanza([])
 
188
        self.assertEqual(s, None)
 
189
        self.assertTrue(s is None)
 
190
        
 
191
    def test_read_iter(self):
 
192
        """Read several stanzas from file"""
 
193
        tmpf = TemporaryFile()
 
194
        tmpf.write("""\
 
195
version_header: 1
 
196
 
 
197
name: foo
 
198
val: 123
 
199
 
 
200
name: bar
 
201
val: 129319
 
202
""")
 
203
        tmpf.seek(0)
 
204
        reader = read_stanzas(tmpf)
 
205
        read_iter = iter(reader)
 
206
        stuff = list(reader)
 
207
        self.assertEqual(stuff, 
 
208
                [ Stanza(version_header='1'),
 
209
                  Stanza(name="foo", val='123'),
 
210
                  Stanza(name="bar", val='129319'), ])
 
211
 
 
212
    def test_read_several(self):
 
213
        """Read several stanzas from file"""
 
214
        tmpf = TemporaryFile()
 
215
        tmpf.write("""\
 
216
version_header: 1
 
217
 
 
218
name: foo
 
219
val: 123
 
220
 
 
221
name: quoted
 
222
address:   "Willowglen"
 
223
\t  42 Wallaby Way
 
224
\t  Sydney
 
225
 
 
226
name: bar
 
227
val: 129319
 
228
""")
 
229
        tmpf.seek(0)
 
230
        s = read_stanza(tmpf)
 
231
        self.assertEquals(s, Stanza(version_header='1'))
 
232
        s = read_stanza(tmpf)
 
233
        self.assertEquals(s, Stanza(name="foo", val='123'))
 
234
        s = read_stanza(tmpf)
 
235
        self.assertEqualDiff(s.get('name'), 'quoted')
 
236
        self.assertEqualDiff(s.get('address'), '  "Willowglen"\n  42 Wallaby Way\n  Sydney')
 
237
        s = read_stanza(tmpf)
 
238
        self.assertEquals(s, Stanza(name="bar", val='129319'))
 
239
        s = read_stanza(tmpf)
 
240
        self.assertEquals(s, None)
 
241
        self.check_rio_file(tmpf)
 
242
 
 
243
    def check_rio_file(self, real_file):
 
244
        real_file.seek(0)
 
245
        read_write = rio_file(RioReader(real_file)).read()
 
246
        real_file.seek(0)
 
247
        self.assertEquals(read_write, real_file.read())
 
248
 
 
249
    @staticmethod
 
250
    def stanzas_to_str(stanzas):
 
251
        return rio_file(stanzas).read()
 
252
 
 
253
    def rio_file_stanzas(self, stanzas):
 
254
        new_stanzas = list(RioReader(rio_file(stanzas)))
 
255
        self.assertEqual(new_stanzas, stanzas)
 
256
 
 
257
    def test_tricky_quoted(self):
 
258
        tmpf = TemporaryFile()
 
259
        tmpf.write('''\
 
260
s: "one"
 
261
 
 
262
s: 
 
263
\t"one"
 
264
\t
 
265
 
 
266
s: "
 
267
 
 
268
s: ""
 
269
 
 
270
s: """
 
271
 
 
272
s: 
 
273
\t
 
274
 
 
275
s: \\
 
276
 
 
277
s: 
 
278
\t\\
 
279
\t\\\\
 
280
\t
 
281
 
 
282
s: word\\
 
283
 
 
284
s: quote"
 
285
 
 
286
s: backslashes\\\\\\
 
287
 
 
288
s: both\\\"
 
289
 
 
290
''')
 
291
        tmpf.seek(0)
 
292
        expected_vals = ['"one"',
 
293
            '\n"one"\n',
 
294
            '"',
 
295
            '""',
 
296
            '"""',
 
297
            '\n',
 
298
            '\\',
 
299
            '\n\\\n\\\\\n',
 
300
            'word\\',
 
301
            'quote\"',
 
302
            'backslashes\\\\\\',
 
303
            'both\\\"',
 
304
            ]
 
305
        for expected in expected_vals:
 
306
            stanza = read_stanza(tmpf)
 
307
            self.rio_file_stanzas([stanza])
 
308
            self.assertEquals(len(stanza), 1)
 
309
            self.assertEqualDiff(stanza.get('s'), expected)
 
310
 
 
311
    def test_write_empty_stanza(self):
 
312
        """Write empty stanza"""
 
313
        l = list(Stanza().to_lines())
 
314
        self.assertEquals(l, [])
 
315
 
 
316
    def test_rio_raises_type_error(self):
 
317
        """TypeError on adding invalid type to Stanza"""
 
318
        s = Stanza()
 
319
        self.assertRaises(TypeError, s.add, 'foo', {})
 
320
 
 
321
    def test_rio_raises_type_error_key(self):
 
322
        """TypeError on adding invalid type to Stanza"""
 
323
        s = Stanza()
 
324
        self.assertRaises(TypeError, s.add, 10, {})
 
325
 
 
326
    def test_rio_unicode(self):
 
327
        uni_data = u'\N{KATAKANA LETTER O}'
 
328
        s = Stanza(foo=uni_data)
 
329
        self.assertEquals(s.get('foo'), uni_data)
 
330
        raw_lines = s.to_lines()
 
331
        self.assertEquals(raw_lines,
 
332
                ['foo: ' + uni_data.encode('utf-8') + '\n'])
 
333
        new_s = read_stanza(raw_lines)
 
334
        self.assertEquals(new_s.get('foo'), uni_data)
 
335
 
 
336
    def test_rio_to_unicode(self):
 
337
        uni_data = u'\N{KATAKANA LETTER O}'
 
338
        s = Stanza(foo=uni_data)
 
339
        unicode_str = s.to_unicode()
 
340
        self.assertEqual(u'foo: %s\n' % (uni_data,), unicode_str)
 
341
        new_s = rio.read_stanza_unicode(unicode_str.splitlines(True))
 
342
        self.assertEqual(uni_data, new_s.get('foo'))
 
343
 
 
344
    def test_nested_rio_unicode(self):
 
345
        uni_data = u'\N{KATAKANA LETTER O}'
 
346
        s = Stanza(foo=uni_data)
 
347
        parent_stanza = Stanza(child=s.to_unicode())
 
348
        raw_lines = parent_stanza.to_lines()
 
349
        self.assertEqual(['child: foo: ' + uni_data.encode('utf-8') + '\n',
 
350
                          '\t\n',
 
351
                         ], raw_lines)
 
352
        new_parent = read_stanza(raw_lines)
 
353
        child_text = new_parent.get('child')
 
354
        self.assertEqual(u'foo: %s\n' % uni_data, child_text)
 
355
        new_child = rio.read_stanza_unicode(child_text.splitlines(True))
 
356
        self.assertEqual(uni_data, new_child.get('foo'))
 
357
 
 
358
    def mail_munge(self, lines, dos_nl=True):
 
359
        new_lines = []
 
360
        for line in lines:
 
361
            line = re.sub(' *\n', '\n', line)
 
362
            if dos_nl:
 
363
                line = re.sub('([^\r])\n', '\\1\r\n', line)
 
364
            new_lines.append(line)
 
365
        return new_lines
 
366
 
 
367
    def test_patch_rio(self):
 
368
        stanza = Stanza(data='#\n\r\\r ', space=' ' * 255, hash='#' * 255)
 
369
        lines = rio.to_patch_lines(stanza)
 
370
        for line in lines:
 
371
            self.assertContainsRe(line, '^# ')
 
372
            self.assertTrue(72 >= len(line))
 
373
        for line in rio.to_patch_lines(stanza, max_width=12):
 
374
            self.assertTrue(12 >= len(line))
 
375
        new_stanza = rio.read_patch_stanza(self.mail_munge(lines,
 
376
                                                           dos_nl=False))
 
377
        lines = self.mail_munge(lines)
 
378
        new_stanza = rio.read_patch_stanza(lines)
 
379
        self.assertEqual('#\n\r\\r ', new_stanza.get('data'))
 
380
        self.assertEqual(' '* 255, new_stanza.get('space'))
 
381
        self.assertEqual('#'* 255, new_stanza.get('hash'))
 
382
 
 
383
    def test_patch_rio_linebreaks(self):
 
384
        stanza = Stanza(breaktest='linebreak -/'*30)
 
385
        self.assertContainsRe(rio.to_patch_lines(stanza, 71)[0],
 
386
                              'linebreak\\\\\n')
 
387
        stanza = Stanza(breaktest='linebreak-/'*30)
 
388
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
 
389
                              'linebreak-\\\\\n')
 
390
        stanza = Stanza(breaktest='linebreak/'*30)
 
391
        self.assertContainsRe(rio.to_patch_lines(stanza, 70)[0],
 
392
                              'linebreak\\\\\n')