~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Martin Pool
  • Date: 2005-05-30 01:39:13 UTC
  • Revision ID: mbp@sourcefrog.net-20050530013913-4ac43c29e1302170
- make sure any bzr output is flushed before 
  running external diff

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for Knit data structure"""
18
 
 
19
 
 
20
 
import difflib
21
 
 
22
 
 
23
 
from bzrlib.errors import KnitError, RevisionAlreadyPresent
24
 
from bzrlib.knit import (
25
 
    KnitVersionedFile,
26
 
    KnitPlainFactory,
27
 
    KnitAnnotateFactory,
28
 
    WeaveToKnit)
29
 
from bzrlib.osutils import split_lines
30
 
from bzrlib.tests import TestCaseWithTransport
31
 
from bzrlib.transport import TransportLogger, get_transport
32
 
from bzrlib.transport.memory import MemoryTransport
33
 
from bzrlib.weave import Weave
34
 
 
35
 
 
36
 
class KnitTests(TestCaseWithTransport):
37
 
    """Class containing knit test helper routines."""
38
 
 
39
 
    def make_test_knit(self, annotate=False):
40
 
        if not annotate:
41
 
            factory = KnitPlainFactory()
42
 
        else:
43
 
            factory = None
44
 
        return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
45
 
 
46
 
 
47
 
class BasicKnitTests(KnitTests):
48
 
 
49
 
    def add_stock_one_and_one_a(self, k):
50
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
51
 
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
52
 
 
53
 
    def test_knit_constructor(self):
54
 
        """Construct empty k"""
55
 
        self.make_test_knit()
56
 
 
57
 
    def test_knit_add(self):
58
 
        """Store one text in knit and retrieve"""
59
 
        k = self.make_test_knit()
60
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
61
 
        self.assertTrue(k.has_version('text-1'))
62
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
63
 
 
64
 
    def test_knit_reload(self):
65
 
        # test that the content in a reloaded knit is correct
66
 
        k = self.make_test_knit()
67
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
68
 
        del k
69
 
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
70
 
        self.assertTrue(k2.has_version('text-1'))
71
 
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
72
 
 
73
 
    def test_knit_several(self):
74
 
        """Store several texts in a knit"""
75
 
        k = self.make_test_knit()
76
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
77
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
78
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
79
 
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
80
 
        
81
 
    def test_repeated_add(self):
82
 
        """Knit traps attempt to replace existing version"""
83
 
        k = self.make_test_knit()
84
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
85
 
        self.assertRaises(RevisionAlreadyPresent, 
86
 
                k.add_lines,
87
 
                'text-1', [], split_lines(TEXT_1))
88
 
 
89
 
    def test_empty(self):
90
 
        k = self.make_test_knit(True)
91
 
        k.add_lines('text-1', [], [])
92
 
        self.assertEquals(k.get_lines('text-1'), [])
93
 
 
94
 
    def test_incomplete(self):
95
 
        """Test if texts without a ending line-end can be inserted and
96
 
        extracted."""
97
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
98
 
        k.add_lines('text-1', [], ['a\n',    'b'  ])
99
 
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
100
 
        # reopening ensures maximum room for confusion
101
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
102
 
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
103
 
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
104
 
 
105
 
    def test_delta(self):
106
 
        """Expression of knit delta as lines"""
107
 
        k = self.make_test_knit()
108
 
        td = list(line_delta(TEXT_1.splitlines(True),
109
 
                             TEXT_1A.splitlines(True)))
110
 
        self.assertEqualDiff(''.join(td), delta_1_1a)
111
 
        out = apply_line_delta(TEXT_1.splitlines(True), td)
112
 
        self.assertEqualDiff(''.join(out), TEXT_1A)
113
 
 
114
 
    def test_add_with_parents(self):
115
 
        """Store in knit with parents"""
116
 
        k = self.make_test_knit()
117
 
        self.add_stock_one_and_one_a(k)
118
 
        self.assertEquals(k.get_parents('text-1'), [])
119
 
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
120
 
 
121
 
    def test_ancestry(self):
122
 
        """Store in knit with parents"""
123
 
        k = self.make_test_knit()
124
 
        self.add_stock_one_and_one_a(k)
125
 
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
126
 
 
127
 
    def test_add_delta(self):
128
 
        """Store in knit with parents"""
129
 
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
130
 
            delta=True, create=True)
131
 
        self.add_stock_one_and_one_a(k)
132
 
        k.clear_cache()
133
 
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
134
 
 
135
 
    def test_annotate(self):
136
 
        """Annotations"""
137
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
138
 
            delta=True, create=True)
139
 
        self.insert_and_test_small_annotate(k)
140
 
 
141
 
    def insert_and_test_small_annotate(self, k):
142
 
        """test annotation with k works correctly."""
143
 
        k.add_lines('text-1', [], ['a\n', 'b\n'])
144
 
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
145
 
 
146
 
        origins = k.annotate('text-2')
147
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
148
 
        self.assertEquals(origins[1], ('text-2', 'c\n'))
149
 
 
150
 
    def test_annotate_fulltext(self):
151
 
        """Annotations"""
152
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
153
 
            delta=False, create=True)
154
 
        self.insert_and_test_small_annotate(k)
155
 
 
156
 
    def test_annotate_merge_1(self):
157
 
        k = self.make_test_knit(True)
158
 
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
159
 
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
160
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
161
 
        origins = k.annotate('text-am')
162
 
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
163
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
164
 
 
165
 
    def test_annotate_merge_2(self):
166
 
        k = self.make_test_knit(True)
167
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
168
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
169
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
170
 
        origins = k.annotate('text-am')
171
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
172
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
173
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
174
 
 
175
 
    def test_annotate_merge_9(self):
176
 
        k = self.make_test_knit(True)
177
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
178
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
179
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
180
 
        origins = k.annotate('text-am')
181
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
182
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
183
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
184
 
 
185
 
    def test_annotate_merge_3(self):
186
 
        k = self.make_test_knit(True)
187
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
188
 
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
189
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
190
 
        origins = k.annotate('text-am')
191
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
192
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
193
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
194
 
 
195
 
    def test_annotate_merge_4(self):
196
 
        k = self.make_test_knit(True)
197
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
198
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
199
 
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
200
 
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
201
 
        origins = k.annotate('text-am')
202
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
203
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
204
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
205
 
 
206
 
    def test_annotate_merge_5(self):
207
 
        k = self.make_test_knit(True)
208
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
209
 
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
210
 
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
211
 
        k.add_lines('text-am',
212
 
                    ['text-a1', 'text-a2', 'text-a3'],
213
 
                    ['a\n', 'e\n', 'z\n'])
214
 
        origins = k.annotate('text-am')
215
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
216
 
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
217
 
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
218
 
 
219
 
    def test_annotate_file_cherry_pick(self):
220
 
        k = self.make_test_knit(True)
221
 
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
222
 
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
223
 
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
224
 
        origins = k.annotate('text-3')
225
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
226
 
        self.assertEquals(origins[1], ('text-1', 'b\n'))
227
 
        self.assertEquals(origins[2], ('text-1', 'c\n'))
228
 
 
229
 
    def test_knit_join(self):
230
 
        """Store in knit with parents"""
231
 
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
232
 
        k1.add_lines('text-a', [], split_lines(TEXT_1))
233
 
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
234
 
 
235
 
        k1.add_lines('text-c', [], split_lines(TEXT_1))
236
 
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
237
 
 
238
 
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
239
 
 
240
 
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
241
 
        count = k2.join(k1, version_ids=['text-m'])
242
 
        self.assertEquals(count, 5)
243
 
        self.assertTrue(k2.has_version('text-a'))
244
 
        self.assertTrue(k2.has_version('text-c'))
245
 
 
246
 
    def test_reannotate(self):
247
 
        k1 = KnitVersionedFile('knit1', get_transport('.'),
248
 
                               factory=KnitAnnotateFactory(), create=True)
249
 
        # 0
250
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
251
 
        # 1
252
 
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
253
 
 
254
 
        k2 = KnitVersionedFile('test2', get_transport('.'),
255
 
                               factory=KnitAnnotateFactory(), create=True)
256
 
        k2.join(k1, version_ids=['text-b'])
257
 
 
258
 
        # 2
259
 
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
260
 
        # 2
261
 
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
262
 
        # 3
263
 
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
264
 
 
265
 
        # test-c will have index 3
266
 
        k1.join(k2, version_ids=['text-c'])
267
 
 
268
 
        lines = k1.get_lines('text-c')
269
 
        self.assertEquals(lines, ['z\n', 'c\n'])
270
 
 
271
 
        origins = k1.annotate('text-c')
272
 
        self.assertEquals(origins[0], ('text-c', 'z\n'))
273
 
        self.assertEquals(origins[1], ('text-b', 'c\n'))
274
 
 
275
 
    def test_get_line_delta_texts(self):
276
 
        """Make sure we can call get_texts on text with reused line deltas"""
277
 
        k1 = KnitVersionedFile('test1', get_transport('.'), 
278
 
                               factory=KnitPlainFactory(), create=True)
279
 
        for t in range(3):
280
 
            if t == 0:
281
 
                parents = []
282
 
            else:
283
 
                parents = ['%d' % (t-1)]
284
 
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
285
 
        k1.get_texts(('%d' % t) for t in range(3))
286
 
        
287
 
    def test_iter_lines_reads_in_order(self):
288
 
        t = MemoryTransport()
289
 
        instrumented_t = TransportLogger(t)
290
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
291
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
292
 
        # add texts with no required ordering
293
 
        k1.add_lines('base', [], ['text\n'])
294
 
        k1.add_lines('base2', [], ['text2\n'])
295
 
        k1.clear_cache()
296
 
        instrumented_t._calls = []
297
 
        # request a last-first iteration
298
 
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
299
 
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
300
 
        self.assertEqual(['text\n', 'text2\n'], results)
301
 
 
302
 
    def test_create_empty_annotated(self):
303
 
        k1 = self.make_test_knit(True)
304
 
        # 0
305
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
306
 
        k2 = k1.create_empty('t', MemoryTransport())
307
 
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
308
 
        self.assertEqual(k1.delta, k2.delta)
309
 
        # the generic test checks for empty content and file class
310
 
 
311
 
    def test_knit_format(self):
312
 
        # this tests that a new knit index file has the expected content
313
 
        # and that is writes the data we expect as records are added.
314
 
        knit = self.make_test_knit(True)
315
 
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
316
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
317
 
        self.assertFileEqual(
318
 
            "# bzr knit index 8\n"
319
 
            "\n"
320
 
            "revid fulltext 0 84 .a_ghost :",
321
 
            'test.kndx')
322
 
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
323
 
        self.assertFileEqual(
324
 
            "# bzr knit index 8\n"
325
 
            "\nrevid fulltext 0 84 .a_ghost :"
326
 
            "\nrevid2 line-delta 84 82 0 :",
327
 
            'test.kndx')
328
 
        # we should be able to load this file again
329
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
330
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
331
 
        # write a short write to the file and ensure that its ignored
332
 
        indexfile = file('test.kndx', 'at')
333
 
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
334
 
        indexfile.close()
335
 
        # we should be able to load this file again
336
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
337
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
338
 
        # and add a revision with the same id the failed write had
339
 
        knit.add_lines('revid3', ['revid2'], ['a\n'])
340
 
        # and when reading it revid3 should now appear.
341
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
342
 
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
343
 
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
344
 
 
345
 
    def test_plan_merge(self):
346
 
        my_knit = self.make_test_knit(annotate=True)
347
 
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
348
 
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
349
 
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
350
 
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
351
 
        for plan_line, expected_line in zip(plan, AB_MERGE):
352
 
            self.assertEqual(plan_line, expected_line)
353
 
 
354
 
 
355
 
TEXT_1 = """\
356
 
Banana cup cakes:
357
 
 
358
 
- bananas
359
 
- eggs
360
 
- broken tea cups
361
 
"""
362
 
 
363
 
TEXT_1A = """\
364
 
Banana cup cake recipe
365
 
(serves 6)
366
 
 
367
 
- bananas
368
 
- eggs
369
 
- broken tea cups
370
 
- self-raising flour
371
 
"""
372
 
 
373
 
TEXT_1B = """\
374
 
Banana cup cake recipe
375
 
 
376
 
- bananas (do not use plantains!!!)
377
 
- broken tea cups
378
 
- flour
379
 
"""
380
 
 
381
 
delta_1_1a = """\
382
 
0,1,2
383
 
Banana cup cake recipe
384
 
(serves 6)
385
 
5,5,1
386
 
- self-raising flour
387
 
"""
388
 
 
389
 
TEXT_2 = """\
390
 
Boeuf bourguignon
391
 
 
392
 
- beef
393
 
- red wine
394
 
- small onions
395
 
- carrot
396
 
- mushrooms
397
 
"""
398
 
 
399
 
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
400
 
new-a|(serves 6)
401
 
unchanged|
402
 
killed-b|- bananas
403
 
killed-b|- eggs
404
 
new-b|- bananas (do not use plantains!!!)
405
 
unchanged|- broken tea cups
406
 
new-a|- self-raising flour
407
 
new-b|- flour
408
 
"""
409
 
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
410
 
 
411
 
 
412
 
def line_delta(from_lines, to_lines):
413
 
    """Generate line-based delta from one text to another"""
414
 
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
415
 
    for op in s.get_opcodes():
416
 
        if op[0] == 'equal':
417
 
            continue
418
 
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
419
 
        for i in range(op[3], op[4]):
420
 
            yield to_lines[i]
421
 
 
422
 
 
423
 
def apply_line_delta(basis_lines, delta_lines):
424
 
    """Apply a line-based perfect diff
425
 
    
426
 
    basis_lines -- text to apply the patch to
427
 
    delta_lines -- diff instructions and content
428
 
    """
429
 
    out = basis_lines[:]
430
 
    i = 0
431
 
    offset = 0
432
 
    while i < len(delta_lines):
433
 
        l = delta_lines[i]
434
 
        a, b, c = map(long, l.split(','))
435
 
        i = i + 1
436
 
        out[offset+a:offset+b] = delta_lines[i:i+c]
437
 
        i = i + c
438
 
        offset = offset + (b - a) + c
439
 
    return out
440
 
 
441
 
 
442
 
class TestWeaveToKnit(KnitTests):
443
 
 
444
 
    def test_weave_to_knit_matches(self):
445
 
        # check that the WeaveToKnit is_compatible function
446
 
        # registers True for a Weave to a Knit.
447
 
        w = Weave()
448
 
        k = self.make_test_knit()
449
 
        self.failUnless(WeaveToKnit.is_compatible(w, k))
450
 
        self.failIf(WeaveToKnit.is_compatible(k, w))
451
 
        self.failIf(WeaveToKnit.is_compatible(w, w))
452
 
        self.failIf(WeaveToKnit.is_compatible(k, k))
453
 
 
454
 
 
455
 
class TestKnitCaching(KnitTests):
456
 
    
457
 
    def create_knit(self, cache_add=False):
458
 
        k = self.make_test_knit(True)
459
 
        if cache_add:
460
 
            k.enable_cache()
461
 
 
462
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
463
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
464
 
        return k
465
 
 
466
 
    def test_no_caching(self):
467
 
        k = self.create_knit()
468
 
        # Nothing should be cached without setting 'enable_cache'
469
 
        self.assertEqual({}, k._data._cache)
470
 
 
471
 
    def test_cache_add_and_clear(self):
472
 
        k = self.create_knit(True)
473
 
 
474
 
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
475
 
 
476
 
        k.clear_cache()
477
 
        self.assertEqual({}, k._data._cache)
478
 
 
479
 
    def test_cache_data_read_raw(self):
480
 
        k = self.create_knit()
481
 
 
482
 
        # Now cache and read
483
 
        k.enable_cache()
484
 
 
485
 
        def read_one_raw(version):
486
 
            pos_map = k._get_components_positions([version])
487
 
            method, pos, size, next = pos_map[version]
488
 
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
489
 
            self.assertEqual(1, len(lst))
490
 
            return lst[0]
491
 
 
492
 
        val = read_one_raw('text-1')
493
 
        self.assertEqual({'text-1':val[1]}, k._data._cache)
494
 
 
495
 
        k.clear_cache()
496
 
        # After clear, new reads are not cached
497
 
        self.assertEqual({}, k._data._cache)
498
 
 
499
 
        val2 = read_one_raw('text-1')
500
 
        self.assertEqual(val, val2)
501
 
        self.assertEqual({}, k._data._cache)
502
 
 
503
 
    def test_cache_data_read(self):
504
 
        k = self.create_knit()
505
 
 
506
 
        def read_one(version):
507
 
            pos_map = k._get_components_positions([version])
508
 
            method, pos, size, next = pos_map[version]
509
 
            lst = list(k._data.read_records_iter([(version, pos, size)]))
510
 
            self.assertEqual(1, len(lst))
511
 
            return lst[0]
512
 
 
513
 
        # Now cache and read
514
 
        k.enable_cache()
515
 
 
516
 
        val = read_one('text-2')
517
 
        self.assertEqual(['text-2'], k._data._cache.keys())
518
 
        self.assertEqual('text-2', val[0])
519
 
        content, digest = k._data._parse_record('text-2',
520
 
                                                k._data._cache['text-2'])
521
 
        self.assertEqual(content, val[1])
522
 
        self.assertEqual(digest, val[2])
523
 
 
524
 
        k.clear_cache()
525
 
        self.assertEqual({}, k._data._cache)
526
 
 
527
 
        val2 = read_one('text-2')
528
 
        self.assertEqual(val, val2)
529
 
        self.assertEqual({}, k._data._cache)
530
 
 
531
 
    def test_cache_read(self):
532
 
        k = self.create_knit()
533
 
        k.enable_cache()
534
 
 
535
 
        text = k.get_text('text-1')
536
 
        self.assertEqual(TEXT_1, text)
537
 
        self.assertEqual(['text-1'], k._data._cache.keys())
538
 
 
539
 
        k.clear_cache()
540
 
        self.assertEqual({}, k._data._cache)
541
 
 
542
 
        text = k.get_text('text-1')
543
 
        self.assertEqual(TEXT_1, text)
544
 
        self.assertEqual({}, k._data._cache)