~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Robert Collins
  • Date: 2007-04-23 02:29:35 UTC
  • mfrom: (2441 +trunk)
  • mto: This revision was merged to the branch mainline in revision 2442.
  • Revision ID: robertc@robertcollins.net-20070423022935-9hhongamvk6bfdso
Resolve conflicts with bzr.dev.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
from cStringIO import StringIO
20
20
import difflib
 
21
import gzip
 
22
import sha
21
23
 
22
24
from bzrlib import (
23
25
    errors,
33
35
    KnitVersionedFile,
34
36
    KnitPlainFactory,
35
37
    KnitAnnotateFactory,
 
38
    _KnitData,
36
39
    _KnitIndex,
37
40
    WeaveToKnit,
38
41
    )
109
112
        else:
110
113
            return StringIO("\n".join(self.file_lines))
111
114
 
 
115
    def readv(self, relpath, offsets):
 
116
        fp = self.get(relpath)
 
117
        for offset, size in offsets:
 
118
            fp.seek(offset)
 
119
            yield offset, fp.read(size)
 
120
 
112
121
    def __getattr__(self, name):
113
122
        def queue_call(*args, **kwargs):
114
123
            self.calls.append((name, args, kwargs))
115
124
        return queue_call
116
125
 
117
126
 
 
127
class LowLevelKnitDataTests(TestCase):
 
128
 
 
129
    def create_gz_content(self, text):
 
130
        sio = StringIO()
 
131
        gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
 
132
        gz_file.write(text)
 
133
        gz_file.close()
 
134
        return sio.getvalue()
 
135
 
 
136
    def test_valid_knit_data(self):
 
137
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
138
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
139
                                        'foo\n'
 
140
                                        'bar\n'
 
141
                                        'end rev-id-1\n'
 
142
                                        % (sha1sum,))
 
143
        transport = MockTransport([gz_txt])
 
144
        data = _KnitData(transport, 'filename', mode='r')
 
145
        records = [('rev-id-1', 0, len(gz_txt))]
 
146
 
 
147
        contents = data.read_records(records)
 
148
        self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
 
149
 
 
150
        raw_contents = list(data.read_records_iter_raw(records))
 
151
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
152
 
 
153
    def test_not_enough_lines(self):
 
154
        sha1sum = sha.new('foo\n').hexdigest()
 
155
        # record says 2 lines data says 1
 
156
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
157
                                        'foo\n'
 
158
                                        'end rev-id-1\n'
 
159
                                        % (sha1sum,))
 
160
        transport = MockTransport([gz_txt])
 
161
        data = _KnitData(transport, 'filename', mode='r')
 
162
        records = [('rev-id-1', 0, len(gz_txt))]
 
163
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
164
 
 
165
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
166
        raw_contents = list(data.read_records_iter_raw(records))
 
167
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
168
 
 
169
    def test_too_many_lines(self):
 
170
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
171
        # record says 1 lines data says 2
 
172
        gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
 
173
                                        'foo\n'
 
174
                                        'bar\n'
 
175
                                        'end rev-id-1\n'
 
176
                                        % (sha1sum,))
 
177
        transport = MockTransport([gz_txt])
 
178
        data = _KnitData(transport, 'filename', mode='r')
 
179
        records = [('rev-id-1', 0, len(gz_txt))]
 
180
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
181
 
 
182
        # read_records_iter_raw won't detect that sort of mismatch/corruption
 
183
        raw_contents = list(data.read_records_iter_raw(records))
 
184
        self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
 
185
 
 
186
    def test_mismatched_version_id(self):
 
187
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
188
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
189
                                        'foo\n'
 
190
                                        'bar\n'
 
191
                                        'end rev-id-1\n'
 
192
                                        % (sha1sum,))
 
193
        transport = MockTransport([gz_txt])
 
194
        data = _KnitData(transport, 'filename', mode='r')
 
195
        # We are asking for rev-id-2, but the data is rev-id-1
 
196
        records = [('rev-id-2', 0, len(gz_txt))]
 
197
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
198
 
 
199
        # read_records_iter_raw will notice if we request the wrong version.
 
200
        self.assertRaises(errors.KnitCorrupt, list,
 
201
                          data.read_records_iter_raw(records))
 
202
 
 
203
    def test_uncompressed_data(self):
 
204
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
205
        txt = ('version rev-id-1 2 %s\n'
 
206
               'foo\n'
 
207
               'bar\n'
 
208
               'end rev-id-1\n'
 
209
               % (sha1sum,))
 
210
        transport = MockTransport([txt])
 
211
        data = _KnitData(transport, 'filename', mode='r')
 
212
        records = [('rev-id-1', 0, len(txt))]
 
213
 
 
214
        # We don't have valid gzip data ==> corrupt
 
215
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
216
 
 
217
        # read_records_iter_raw will notice the bad data
 
218
        self.assertRaises(errors.KnitCorrupt, list,
 
219
                          data.read_records_iter_raw(records))
 
220
 
 
221
    def test_corrupted_data(self):
 
222
        sha1sum = sha.new('foo\nbar\n').hexdigest()
 
223
        gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
 
224
                                        'foo\n'
 
225
                                        'bar\n'
 
226
                                        'end rev-id-1\n'
 
227
                                        % (sha1sum,))
 
228
        # Change 2 bytes in the middle to \xff
 
229
        gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
 
230
        transport = MockTransport([gz_txt])
 
231
        data = _KnitData(transport, 'filename', mode='r')
 
232
        records = [('rev-id-1', 0, len(gz_txt))]
 
233
 
 
234
        self.assertRaises(errors.KnitCorrupt, data.read_records, records)
 
235
 
 
236
        # read_records_iter_raw will notice if we request the wrong version.
 
237
        self.assertRaises(errors.KnitCorrupt, list,
 
238
                          data.read_records_iter_raw(records))
 
239
 
 
240
 
118
241
class LowLevelKnitIndexTests(TestCase):
119
242
 
120
243
    def test_no_such_file(self):