~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_pack.py

  • Committer: Canonical.com Patch Queue Manager
  • Date: 2007-11-04 18:51:39 UTC
  • mfrom: (2961.1.1 trunk)
  • Revision ID: pqm@pqm.ubuntu.com-20071104185139-kaio3sneodg2kp71
Authentication ring implementation (read-only)

Show diffs side-by-side

added added

removed removed

Lines of Context:
22
22
from bzrlib import pack, errors, tests
23
23
 
24
24
 
25
 
class TestContainerSerialiser(tests.TestCase):
26
 
    """Tests for the ContainerSerialiser class."""
27
 
 
28
 
    def test_construct(self):
29
 
        """Test constructing a ContainerSerialiser."""
30
 
        pack.ContainerSerialiser()
31
 
 
32
 
    def test_begin(self):
33
 
        serialiser = pack.ContainerSerialiser()
34
 
        self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
35
 
                         serialiser.begin())
36
 
 
37
 
    def test_end(self):
38
 
        serialiser = pack.ContainerSerialiser()
39
 
        self.assertEqual('E', serialiser.end())
40
 
 
41
 
    def test_bytes_record_no_name(self):
42
 
        serialiser = pack.ContainerSerialiser()
43
 
        record = serialiser.bytes_record('bytes', [])
44
 
        self.assertEqual('B5\n\nbytes', record)
45
 
        
46
 
    def test_bytes_record_one_name_with_one_part(self):
47
 
        serialiser = pack.ContainerSerialiser()
48
 
        record = serialiser.bytes_record('bytes', [('name',)])
49
 
        self.assertEqual('B5\nname\n\nbytes', record)
50
 
        
51
 
    def test_bytes_record_one_name_with_two_parts(self):
52
 
        serialiser = pack.ContainerSerialiser()
53
 
        record = serialiser.bytes_record('bytes', [('part1', 'part2')])
54
 
        self.assertEqual('B5\npart1\x00part2\n\nbytes', record)
55
 
        
56
 
    def test_bytes_record_two_names(self):
57
 
        serialiser = pack.ContainerSerialiser()
58
 
        record = serialiser.bytes_record('bytes', [('name1',), ('name2',)])
59
 
        self.assertEqual('B5\nname1\nname2\n\nbytes', record)
60
 
 
61
 
    def test_bytes_record_whitespace_in_name_part(self):
62
 
        serialiser = pack.ContainerSerialiser()
63
 
        self.assertRaises(
64
 
            errors.InvalidRecordError,
65
 
            serialiser.bytes_record, 'bytes', [('bad name',)])
66
 
 
67
 
 
68
25
class TestContainerWriter(tests.TestCase):
69
26
 
70
 
    def setUp(self):
71
 
        self.output = StringIO()
72
 
        self.writer = pack.ContainerWriter(self.output.write)
73
 
 
74
 
    def assertOutput(self, expected_output):
75
 
        """Assert that the output of self.writer ContainerWriter is equal to
76
 
        expected_output.
77
 
        """
78
 
        self.assertEqual(expected_output, self.output.getvalue())
79
 
 
80
27
    def test_construct(self):
81
28
        """Test constructing a ContainerWriter.
82
29
        
83
 
        This uses None as the output stream to show that the constructor
84
 
        doesn't try to use the output stream.
 
30
        This uses None as the output stream to show that the constructor doesn't
 
31
        try to use the output stream.
85
32
        """
86
33
        writer = pack.ContainerWriter(None)
87
34
 
88
35
    def test_begin(self):
89
36
        """The begin() method writes the container format marker line."""
90
 
        self.writer.begin()
91
 
        self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\n')
 
37
        output = StringIO()
 
38
        writer = pack.ContainerWriter(output.write)
 
39
        writer.begin()
 
40
        self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\n',
 
41
                         output.getvalue())
92
42
 
93
43
    def test_zero_records_written_after_begin(self):
94
44
        """After begin is written, 0 records have been written."""
95
 
        self.writer.begin()
96
 
        self.assertEqual(0, self.writer.records_written)
 
45
        output = StringIO()
 
46
        writer = pack.ContainerWriter(output.write)
 
47
        writer.begin()
 
48
        self.assertEqual(0, writer.records_written)
97
49
 
98
50
    def test_end(self):
99
51
        """The end() method writes an End Marker record."""
100
 
        self.writer.begin()
101
 
        self.writer.end()
102
 
        self.assertOutput('Bazaar pack format 1 (introduced in 0.18)\nE')
 
52
        output = StringIO()
 
53
        writer = pack.ContainerWriter(output.write)
 
54
        writer.begin()
 
55
        writer.end()
 
56
        self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nE',
 
57
                         output.getvalue())
103
58
 
104
59
    def test_empty_end_does_not_add_a_record_to_records_written(self):
105
60
        """The end() method does not count towards the records written."""
106
 
        self.writer.begin()
107
 
        self.writer.end()
108
 
        self.assertEqual(0, self.writer.records_written)
 
61
        output = StringIO()
 
62
        writer = pack.ContainerWriter(output.write)
 
63
        writer.begin()
 
64
        writer.end()
 
65
        self.assertEqual(0, writer.records_written)
109
66
 
110
67
    def test_non_empty_end_does_not_add_a_record_to_records_written(self):
111
68
        """The end() method does not count towards the records written."""
112
 
        self.writer.begin()
113
 
        self.writer.add_bytes_record('foo', names=[])
114
 
        self.writer.end()
115
 
        self.assertEqual(1, self.writer.records_written)
 
69
        output = StringIO()
 
70
        writer = pack.ContainerWriter(output.write)
 
71
        writer.begin()
 
72
        writer.add_bytes_record('foo', names=[])
 
73
        writer.end()
 
74
        self.assertEqual(1, writer.records_written)
116
75
 
117
76
    def test_add_bytes_record_no_name(self):
118
77
        """Add a bytes record with no name."""
119
 
        self.writer.begin()
120
 
        offset, length = self.writer.add_bytes_record('abc', names=[])
 
78
        output = StringIO()
 
79
        writer = pack.ContainerWriter(output.write)
 
80
        writer.begin()
 
81
        offset, length = writer.add_bytes_record('abc', names=[])
121
82
        self.assertEqual((42, 7), (offset, length))
122
 
        self.assertOutput(
123
 
            'Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc')
 
83
        self.assertEqual('Bazaar pack format 1 (introduced in 0.18)\nB3\n\nabc',
 
84
                         output.getvalue())
124
85
 
125
86
    def test_add_bytes_record_one_name(self):
126
87
        """Add a bytes record with one name."""
127
 
        self.writer.begin()
128
 
        offset, length = self.writer.add_bytes_record(
129
 
            'abc', names=[('name1', )])
 
88
        output = StringIO()
 
89
        writer = pack.ContainerWriter(output.write)
 
90
        writer.begin()
 
91
        offset, length = writer.add_bytes_record('abc', names=[('name1', )])
130
92
        self.assertEqual((42, 13), (offset, length))
131
 
        self.assertOutput(
132
 
            'Bazaar pack format 1 (introduced in 0.18)\n'
133
 
            'B3\nname1\n\nabc')
134
 
 
135
 
    def test_add_bytes_record_two_names(self):
136
 
        """Add a bytes record with two names."""
137
 
        self.writer.begin()
138
 
        offset, length = self.writer.add_bytes_record(
139
 
            'abc', names=[('name1', ), ('name2', )])
140
 
        self.assertEqual((42, 19), (offset, length))
141
 
        self.assertOutput(
142
 
            'Bazaar pack format 1 (introduced in 0.18)\n'
143
 
            'B3\nname1\nname2\n\nabc')
144
 
 
145
 
    def test_add_bytes_record_two_names(self):
146
 
        """Add a bytes record with two names."""
147
 
        self.writer.begin()
148
 
        offset, length = self.writer.add_bytes_record(
149
 
            'abc', names=[('name1', ), ('name2', )])
150
 
        self.assertEqual((42, 19), (offset, length))
151
 
        self.assertOutput(
152
 
            'Bazaar pack format 1 (introduced in 0.18)\n'
153
 
            'B3\nname1\nname2\n\nabc')
 
93
        self.assertEqual(
 
94
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
95
            'B3\nname1\n\nabc',
 
96
            output.getvalue())
 
97
 
 
98
    def test_add_bytes_record_two_names(self):
 
99
        """Add a bytes record with two names."""
 
100
        output = StringIO()
 
101
        writer = pack.ContainerWriter(output.write)
 
102
        writer.begin()
 
103
        offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
 
104
        self.assertEqual((42, 19), (offset, length))
 
105
        self.assertEqual(
 
106
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
107
            'B3\nname1\nname2\n\nabc',
 
108
            output.getvalue())
 
109
 
 
110
    def test_add_bytes_record_two_names(self):
 
111
        """Add a bytes record with two names."""
 
112
        output = StringIO()
 
113
        writer = pack.ContainerWriter(output.write)
 
114
        writer.begin()
 
115
        offset, length = writer.add_bytes_record('abc', names=[('name1', ), ('name2', )])
 
116
        self.assertEqual((42, 19), (offset, length))
 
117
        self.assertEqual(
 
118
            'Bazaar pack format 1 (introduced in 0.18)\n'
 
119
            'B3\nname1\nname2\n\nabc',
 
120
            output.getvalue())
154
121
 
155
122
    def test_add_bytes_record_two_element_name(self):
156
123
        """Add a bytes record with a two-element name."""
157
 
        self.writer.begin()
158
 
        offset, length = self.writer.add_bytes_record(
159
 
            'abc', names=[('name1', 'name2')])
 
124
        output = StringIO()
 
125
        writer = pack.ContainerWriter(output.write)
 
126
        writer.begin()
 
127
        offset, length = writer.add_bytes_record('abc', names=[('name1', 'name2')])
160
128
        self.assertEqual((42, 19), (offset, length))
161
 
        self.assertOutput(
 
129
        self.assertEqual(
162
130
            'Bazaar pack format 1 (introduced in 0.18)\n'
163
 
            'B3\nname1\x00name2\n\nabc')
 
131
            'B3\nname1\x00name2\n\nabc',
 
132
            output.getvalue())
164
133
 
165
134
    def test_add_second_bytes_record_gets_higher_offset(self):
166
 
        self.writer.begin()
167
 
        self.writer.add_bytes_record('abc', names=[])
168
 
        offset, length = self.writer.add_bytes_record('abc', names=[])
 
135
        output = StringIO()
 
136
        writer = pack.ContainerWriter(output.write)
 
137
        writer.begin()
 
138
        writer.add_bytes_record('abc', names=[])
 
139
        offset, length = writer.add_bytes_record('abc', names=[])
169
140
        self.assertEqual((49, 7), (offset, length))
170
 
        self.assertOutput(
 
141
        self.assertEqual(
171
142
            'Bazaar pack format 1 (introduced in 0.18)\n'
172
143
            'B3\n\nabc'
173
 
            'B3\n\nabc')
 
144
            'B3\n\nabc',
 
145
            output.getvalue())
174
146
 
175
147
    def test_add_bytes_record_invalid_name(self):
176
148
        """Adding a Bytes record with a name with whitespace in it raises
177
149
        InvalidRecordError.
178
150
        """
179
 
        self.writer.begin()
 
151
        output = StringIO()
 
152
        writer = pack.ContainerWriter(output.write)
 
153
        writer.begin()
180
154
        self.assertRaises(
181
155
            errors.InvalidRecordError,
182
 
            self.writer.add_bytes_record, 'abc', names=[('bad name', )])
 
156
            writer.add_bytes_record, 'abc', names=[('bad name', )])
183
157
 
184
158
    def test_add_bytes_records_add_to_records_written(self):
185
159
        """Adding a Bytes record increments the records_written counter."""
186
 
        self.writer.begin()
187
 
        self.writer.add_bytes_record('foo', names=[])
188
 
        self.assertEqual(1, self.writer.records_written)
189
 
        self.writer.add_bytes_record('foo', names=[])
190
 
        self.assertEqual(2, self.writer.records_written)
 
160
        output = StringIO()
 
161
        writer = pack.ContainerWriter(output.write)
 
162
        writer.begin()
 
163
        writer.add_bytes_record('foo', names=[])
 
164
        self.assertEqual(1, writer.records_written)
 
165
        writer.add_bytes_record('foo', names=[])
 
166
        self.assertEqual(2, writer.records_written)
191
167
 
192
168
 
193
169
class TestContainerReader(tests.TestCase):
194
 
    """Tests for the ContainerReader.
195
 
 
196
 
    The ContainerReader reads format 1 containers, so these tests explicitly
197
 
    test how it reacts to format 1 data.  If a new version of the format is
198
 
    added, then separate tests for that format should be added.
199
 
    """
200
170
 
201
171
    def get_reader_for(self, bytes):
202
172
        stream = StringIO(bytes)
329
299
        
330
300
 
331
301
class TestBytesRecordReader(tests.TestCase):
332
 
    """Tests for reading and validating Bytes records with
333
 
    BytesRecordReader.
334
 
    
335
 
    Like TestContainerReader, this explicitly tests the reading of format 1
336
 
    data.  If a new version of the format is added, then a separate set of
337
 
    tests for reading that format should be added.
338
 
    """
 
302
    """Tests for reading and validating Bytes records with BytesRecordReader."""
339
303
 
340
304
    def get_reader_for(self, bytes):
341
305
        stream = StringIO(bytes)
559
523
        results.append(f.readline())
560
524
        results.append(f.read(4))
561
525
        self.assertEqual(['0', '\n', '2\n4\n'], results)
562
 
 
563
 
 
564
 
class PushParserTestCase(tests.TestCase):
565
 
    """Base class for TestCases involving ContainerPushParser."""
566
 
 
567
 
    def make_parser_expecting_record_type(self):
568
 
        parser = pack.ContainerPushParser()
569
 
        parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\n")
570
 
        return parser
571
 
 
572
 
    def make_parser_expecting_bytes_record(self):
573
 
        parser = pack.ContainerPushParser()
574
 
        parser.accept_bytes("Bazaar pack format 1 (introduced in 0.18)\nB")
575
 
        return parser
576
 
 
577
 
    def assertRecordParsing(self, expected_record, bytes):
578
 
        """Assert that 'bytes' is parsed as a given bytes record.
579
 
 
580
 
        :param expected_record: A tuple of (names, bytes).
581
 
        """
582
 
        parser = self.make_parser_expecting_bytes_record()
583
 
        parser.accept_bytes(bytes)
584
 
        parsed_records = parser.read_pending_records()
585
 
        self.assertEqual([expected_record], parsed_records)
586
 
 
587
 
        
588
 
class TestContainerPushParser(PushParserTestCase):
589
 
    """Tests for ContainerPushParser.
590
 
    
591
 
    The ContainerPushParser reads format 1 containers, so these tests
592
 
    explicitly test how it reacts to format 1 data.  If a new version of the
593
 
    format is added, then separate tests for that format should be added.
594
 
    """
595
 
 
596
 
    def test_construct(self):
597
 
        """ContainerPushParser can be constructed."""
598
 
        pack.ContainerPushParser()
599
 
 
600
 
    def test_multiple_records_at_once(self):
601
 
        """If multiple records worth of data are fed to the parser in one
602
 
        string, the parser will correctly parse all the records.
603
 
 
604
 
        (A naive implementation might stop after parsing the first record.)
605
 
        """
606
 
        parser = self.make_parser_expecting_record_type()
607
 
        parser.accept_bytes("B5\nname1\n\nbody1B5\nname2\n\nbody2")
608
 
        self.assertEqual(
609
 
            [([('name1',)], 'body1'), ([('name2',)], 'body2')],
610
 
            parser.read_pending_records())
611
 
 
612
 
 
613
 
class TestContainerPushParserBytesParsing(PushParserTestCase):
614
 
    """Tests for reading Bytes records with ContainerPushParser.
615
 
    
616
 
    The ContainerPushParser reads format 1 containers, so these tests
617
 
    explicitly test how it reacts to format 1 data.  If a new version of the
618
 
    format is added, then separate tests for that format should be added.
619
 
    """
620
 
 
621
 
    def test_record_with_no_name(self):
622
 
        """Reading a Bytes record with no name returns an empty list of
623
 
        names.
624
 
        """
625
 
        self.assertRecordParsing(([], 'aaaaa'), "5\n\naaaaa")
626
 
 
627
 
    def test_record_with_one_name(self):
628
 
        """Reading a Bytes record with one name returns a list of just that
629
 
        name.
630
 
        """
631
 
        self.assertRecordParsing(
632
 
            ([('name1', )], 'aaaaa'),
633
 
            "5\nname1\n\naaaaa")
634
 
 
635
 
    def test_record_with_two_names(self):
636
 
        """Reading a Bytes record with two names returns a list of both names.
637
 
        """
638
 
        self.assertRecordParsing(
639
 
            ([('name1', ), ('name2', )], 'aaaaa'),
640
 
            "5\nname1\nname2\n\naaaaa")
641
 
 
642
 
    def test_record_with_two_part_names(self):
643
 
        """Reading a Bytes record with a two_part name reads both."""
644
 
        self.assertRecordParsing(
645
 
            ([('name1', 'name2')], 'aaaaa'),
646
 
            "5\nname1\x00name2\n\naaaaa")
647
 
 
648
 
    def test_invalid_length(self):
649
 
        """If the length-prefix is not a number, parsing raises
650
 
        InvalidRecordError.
651
 
        """
652
 
        parser = self.make_parser_expecting_bytes_record()
653
 
        self.assertRaises(
654
 
            errors.InvalidRecordError, parser.accept_bytes, "not a number\n")
655
 
 
656
 
    def test_incomplete_record(self):
657
 
        """If the bytes seen so far don't form a complete record, then there
658
 
        will be nothing returned by read_pending_records.
659
 
        """
660
 
        parser = self.make_parser_expecting_bytes_record()
661
 
        parser.accept_bytes("5\n\nabcd")
662
 
        self.assertEqual([], parser.read_pending_records())
663
 
 
664
 
    def test_accept_nothing(self):
665
 
        """The edge case of parsing an empty string causes no error."""
666
 
        parser = self.make_parser_expecting_bytes_record()
667
 
        parser.accept_bytes("")
668
 
 
669
 
    def assertInvalidRecord(self, bytes):
670
 
        """Assert that parsing the given bytes will raise an
671
 
        InvalidRecordError.
672
 
        """
673
 
        parser = self.make_parser_expecting_bytes_record()
674
 
        self.assertRaises(
675
 
            errors.InvalidRecordError, parser.accept_bytes, bytes)
676
 
 
677
 
    def test_read_invalid_name_whitespace(self):
678
 
        """Names must have no whitespace."""
679
 
        # A name with a space.
680
 
        self.assertInvalidRecord("0\nbad name\n\n")
681
 
 
682
 
        # A name with a tab.
683
 
        self.assertInvalidRecord("0\nbad\tname\n\n")
684
 
 
685
 
        # A name with a vertical tab.
686
 
        self.assertInvalidRecord("0\nbad\vname\n\n")
687
 
 
688
 
    def test_repeated_read_pending_records(self):
689
 
        """read_pending_records will not return the same record twice."""
690
 
        parser = self.make_parser_expecting_bytes_record()
691
 
        parser.accept_bytes("6\n\nabcdef")
692
 
        self.assertEqual([([], 'abcdef')], parser.read_pending_records())
693
 
        self.assertEqual([], parser.read_pending_records())
694
 
 
695