110
113
return StringIO("\n".join(self.file_lines))
115
def readv(self, relpath, offsets):
116
fp = self.get(relpath)
117
for offset, size in offsets:
119
yield offset, fp.read(size)
112
121
def __getattr__(self, name):
113
122
def queue_call(*args, **kwargs):
114
123
self.calls.append((name, args, kwargs))
115
124
return queue_call
127
class LowLevelKnitDataTests(TestCase):
129
def create_gz_content(self, text):
131
gz_file = gzip.GzipFile(mode='wb', fileobj=sio)
134
return sio.getvalue()
136
def test_valid_knit_data(self):
137
sha1sum = sha.new('foo\nbar\n').hexdigest()
138
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
143
transport = MockTransport([gz_txt])
144
data = _KnitData(transport, 'filename', mode='r')
145
records = [('rev-id-1', 0, len(gz_txt))]
147
contents = data.read_records(records)
148
self.assertEqual({'rev-id-1':(['foo\n', 'bar\n'], sha1sum)}, contents)
150
raw_contents = list(data.read_records_iter_raw(records))
151
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
153
def test_not_enough_lines(self):
154
sha1sum = sha.new('foo\n').hexdigest()
155
# record says 2 lines data says 1
156
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
160
transport = MockTransport([gz_txt])
161
data = _KnitData(transport, 'filename', mode='r')
162
records = [('rev-id-1', 0, len(gz_txt))]
163
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
165
# read_records_iter_raw won't detect that sort of mismatch/corruption
166
raw_contents = list(data.read_records_iter_raw(records))
167
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
169
def test_too_many_lines(self):
170
sha1sum = sha.new('foo\nbar\n').hexdigest()
171
# record says 1 lines data says 2
172
gz_txt = self.create_gz_content('version rev-id-1 1 %s\n'
177
transport = MockTransport([gz_txt])
178
data = _KnitData(transport, 'filename', mode='r')
179
records = [('rev-id-1', 0, len(gz_txt))]
180
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
182
# read_records_iter_raw won't detect that sort of mismatch/corruption
183
raw_contents = list(data.read_records_iter_raw(records))
184
self.assertEqual([('rev-id-1', gz_txt)], raw_contents)
186
def test_mismatched_version_id(self):
187
sha1sum = sha.new('foo\nbar\n').hexdigest()
188
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
193
transport = MockTransport([gz_txt])
194
data = _KnitData(transport, 'filename', mode='r')
195
# We are asking for rev-id-2, but the data is rev-id-1
196
records = [('rev-id-2', 0, len(gz_txt))]
197
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
199
# read_records_iter_raw will notice if we request the wrong version.
200
self.assertRaises(errors.KnitCorrupt, list,
201
data.read_records_iter_raw(records))
203
def test_uncompressed_data(self):
204
sha1sum = sha.new('foo\nbar\n').hexdigest()
205
txt = ('version rev-id-1 2 %s\n'
210
transport = MockTransport([txt])
211
data = _KnitData(transport, 'filename', mode='r')
212
records = [('rev-id-1', 0, len(txt))]
214
# We don't have valid gzip data ==> corrupt
215
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
217
# read_records_iter_raw will notice the bad data
218
self.assertRaises(errors.KnitCorrupt, list,
219
data.read_records_iter_raw(records))
221
def test_corrupted_data(self):
222
sha1sum = sha.new('foo\nbar\n').hexdigest()
223
gz_txt = self.create_gz_content('version rev-id-1 2 %s\n'
228
# Change 2 bytes in the middle to \xff
229
gz_txt = gz_txt[:10] + '\xff\xff' + gz_txt[12:]
230
transport = MockTransport([gz_txt])
231
data = _KnitData(transport, 'filename', mode='r')
232
records = [('rev-id-1', 0, len(gz_txt))]
234
self.assertRaises(errors.KnitCorrupt, data.read_records, records)
236
# read_records_iter_raw will notice if we request the wrong version.
237
self.assertRaises(errors.KnitCorrupt, list,
238
data.read_records_iter_raw(records))
118
241
class LowLevelKnitIndexTests(TestCase):
120
243
def test_no_such_file(self):