~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Martin Pool
  • Date: 2005-09-13 02:11:41 UTC
  • Revision ID: mbp@sourcefrog.net-20050913021141-263bfc2655ac3ed2
- store inventories in weave

- put more intelligence into WeaveStore

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for Knit data structure"""
18
 
 
19
 
 
20
 
import difflib
21
 
 
22
 
 
23
 
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
24
 
from bzrlib.knit import (
25
 
    KnitVersionedFile,
26
 
    KnitPlainFactory,
27
 
    KnitAnnotateFactory,
28
 
    WeaveToKnit)
29
 
from bzrlib.osutils import split_lines
30
 
from bzrlib.tests import TestCaseWithTransport
31
 
from bzrlib.transport import TransportLogger, get_transport
32
 
from bzrlib.transport.memory import MemoryTransport
33
 
from bzrlib.weave import Weave
34
 
 
35
 
 
36
 
class KnitTests(TestCaseWithTransport):
37
 
    """Class containing knit test helper routines."""
38
 
 
39
 
    def make_test_knit(self, annotate=False, delay_create=False):
40
 
        if not annotate:
41
 
            factory = KnitPlainFactory()
42
 
        else:
43
 
            factory = None
44
 
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
45
 
                                 factory=factory, create=True,
46
 
                                 delay_create=delay_create)
47
 
 
48
 
 
49
 
class BasicKnitTests(KnitTests):
50
 
 
51
 
    def add_stock_one_and_one_a(self, k):
52
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
53
 
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
54
 
 
55
 
    def test_knit_constructor(self):
56
 
        """Construct empty k"""
57
 
        self.make_test_knit()
58
 
 
59
 
    def test_knit_add(self):
60
 
        """Store one text in knit and retrieve"""
61
 
        k = self.make_test_knit()
62
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
63
 
        self.assertTrue(k.has_version('text-1'))
64
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
65
 
 
66
 
    def test_knit_reload(self):
67
 
        # test that the content in a reloaded knit is correct
68
 
        k = self.make_test_knit()
69
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
70
 
        del k
71
 
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
72
 
        self.assertTrue(k2.has_version('text-1'))
73
 
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
74
 
 
75
 
    def test_knit_several(self):
76
 
        """Store several texts in a knit"""
77
 
        k = self.make_test_knit()
78
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
79
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
80
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
81
 
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
82
 
        
83
 
    def test_repeated_add(self):
84
 
        """Knit traps attempt to replace existing version"""
85
 
        k = self.make_test_knit()
86
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
87
 
        self.assertRaises(RevisionAlreadyPresent, 
88
 
                k.add_lines,
89
 
                'text-1', [], split_lines(TEXT_1))
90
 
 
91
 
    def test_empty(self):
92
 
        k = self.make_test_knit(True)
93
 
        k.add_lines('text-1', [], [])
94
 
        self.assertEquals(k.get_lines('text-1'), [])
95
 
 
96
 
    def test_incomplete(self):
97
 
        """Test if texts without a ending line-end can be inserted and
98
 
        extracted."""
99
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
100
 
        k.add_lines('text-1', [], ['a\n',    'b'  ])
101
 
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
102
 
        # reopening ensures maximum room for confusion
103
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
104
 
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
105
 
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
106
 
 
107
 
    def test_delta(self):
108
 
        """Expression of knit delta as lines"""
109
 
        k = self.make_test_knit()
110
 
        td = list(line_delta(TEXT_1.splitlines(True),
111
 
                             TEXT_1A.splitlines(True)))
112
 
        self.assertEqualDiff(''.join(td), delta_1_1a)
113
 
        out = apply_line_delta(TEXT_1.splitlines(True), td)
114
 
        self.assertEqualDiff(''.join(out), TEXT_1A)
115
 
 
116
 
    def test_add_with_parents(self):
117
 
        """Store in knit with parents"""
118
 
        k = self.make_test_knit()
119
 
        self.add_stock_one_and_one_a(k)
120
 
        self.assertEquals(k.get_parents('text-1'), [])
121
 
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
122
 
 
123
 
    def test_ancestry(self):
124
 
        """Store in knit with parents"""
125
 
        k = self.make_test_knit()
126
 
        self.add_stock_one_and_one_a(k)
127
 
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
128
 
 
129
 
    def test_add_delta(self):
130
 
        """Store in knit with parents"""
131
 
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
132
 
            delta=True, create=True)
133
 
        self.add_stock_one_and_one_a(k)
134
 
        k.clear_cache()
135
 
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
136
 
 
137
 
    def test_annotate(self):
138
 
        """Annotations"""
139
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
140
 
            delta=True, create=True)
141
 
        self.insert_and_test_small_annotate(k)
142
 
 
143
 
    def insert_and_test_small_annotate(self, k):
144
 
        """test annotation with k works correctly."""
145
 
        k.add_lines('text-1', [], ['a\n', 'b\n'])
146
 
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
147
 
 
148
 
        origins = k.annotate('text-2')
149
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
150
 
        self.assertEquals(origins[1], ('text-2', 'c\n'))
151
 
 
152
 
    def test_annotate_fulltext(self):
153
 
        """Annotations"""
154
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
155
 
            delta=False, create=True)
156
 
        self.insert_and_test_small_annotate(k)
157
 
 
158
 
    def test_annotate_merge_1(self):
159
 
        k = self.make_test_knit(True)
160
 
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
161
 
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
162
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
163
 
        origins = k.annotate('text-am')
164
 
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
165
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
166
 
 
167
 
    def test_annotate_merge_2(self):
168
 
        k = self.make_test_knit(True)
169
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
170
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
171
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
172
 
        origins = k.annotate('text-am')
173
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
174
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
175
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
176
 
 
177
 
    def test_annotate_merge_9(self):
178
 
        k = self.make_test_knit(True)
179
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
180
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
181
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
182
 
        origins = k.annotate('text-am')
183
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
184
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
185
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
186
 
 
187
 
    def test_annotate_merge_3(self):
188
 
        k = self.make_test_knit(True)
189
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
190
 
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
191
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
192
 
        origins = k.annotate('text-am')
193
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
194
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
195
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
196
 
 
197
 
    def test_annotate_merge_4(self):
198
 
        k = self.make_test_knit(True)
199
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
200
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
201
 
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
202
 
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
203
 
        origins = k.annotate('text-am')
204
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
205
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
206
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
207
 
 
208
 
    def test_annotate_merge_5(self):
209
 
        k = self.make_test_knit(True)
210
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
211
 
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
212
 
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
213
 
        k.add_lines('text-am',
214
 
                    ['text-a1', 'text-a2', 'text-a3'],
215
 
                    ['a\n', 'e\n', 'z\n'])
216
 
        origins = k.annotate('text-am')
217
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
218
 
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
219
 
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
220
 
 
221
 
    def test_annotate_file_cherry_pick(self):
222
 
        k = self.make_test_knit(True)
223
 
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
224
 
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
225
 
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
226
 
        origins = k.annotate('text-3')
227
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
228
 
        self.assertEquals(origins[1], ('text-1', 'b\n'))
229
 
        self.assertEquals(origins[2], ('text-1', 'c\n'))
230
 
 
231
 
    def test_knit_join(self):
232
 
        """Store in knit with parents"""
233
 
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
234
 
        k1.add_lines('text-a', [], split_lines(TEXT_1))
235
 
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
236
 
 
237
 
        k1.add_lines('text-c', [], split_lines(TEXT_1))
238
 
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
239
 
 
240
 
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
241
 
 
242
 
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
243
 
        count = k2.join(k1, version_ids=['text-m'])
244
 
        self.assertEquals(count, 5)
245
 
        self.assertTrue(k2.has_version('text-a'))
246
 
        self.assertTrue(k2.has_version('text-c'))
247
 
 
248
 
    def test_reannotate(self):
249
 
        k1 = KnitVersionedFile('knit1', get_transport('.'),
250
 
                               factory=KnitAnnotateFactory(), create=True)
251
 
        # 0
252
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
253
 
        # 1
254
 
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
255
 
 
256
 
        k2 = KnitVersionedFile('test2', get_transport('.'),
257
 
                               factory=KnitAnnotateFactory(), create=True)
258
 
        k2.join(k1, version_ids=['text-b'])
259
 
 
260
 
        # 2
261
 
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
262
 
        # 2
263
 
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
264
 
        # 3
265
 
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
266
 
 
267
 
        # test-c will have index 3
268
 
        k1.join(k2, version_ids=['text-c'])
269
 
 
270
 
        lines = k1.get_lines('text-c')
271
 
        self.assertEquals(lines, ['z\n', 'c\n'])
272
 
 
273
 
        origins = k1.annotate('text-c')
274
 
        self.assertEquals(origins[0], ('text-c', 'z\n'))
275
 
        self.assertEquals(origins[1], ('text-b', 'c\n'))
276
 
 
277
 
    def test_get_line_delta_texts(self):
278
 
        """Make sure we can call get_texts on text with reused line deltas"""
279
 
        k1 = KnitVersionedFile('test1', get_transport('.'), 
280
 
                               factory=KnitPlainFactory(), create=True)
281
 
        for t in range(3):
282
 
            if t == 0:
283
 
                parents = []
284
 
            else:
285
 
                parents = ['%d' % (t-1)]
286
 
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
287
 
        k1.get_texts(('%d' % t) for t in range(3))
288
 
        
289
 
    def test_iter_lines_reads_in_order(self):
290
 
        t = MemoryTransport()
291
 
        instrumented_t = TransportLogger(t)
292
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
293
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
294
 
        # add texts with no required ordering
295
 
        k1.add_lines('base', [], ['text\n'])
296
 
        k1.add_lines('base2', [], ['text2\n'])
297
 
        k1.clear_cache()
298
 
        instrumented_t._calls = []
299
 
        # request a last-first iteration
300
 
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
301
 
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
302
 
        self.assertEqual(['text\n', 'text2\n'], results)
303
 
 
304
 
    def test_create_empty_annotated(self):
305
 
        k1 = self.make_test_knit(True)
306
 
        # 0
307
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
308
 
        k2 = k1.create_empty('t', MemoryTransport())
309
 
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
310
 
        self.assertEqual(k1.delta, k2.delta)
311
 
        # the generic test checks for empty content and file class
312
 
 
313
 
    def test_knit_format(self):
314
 
        # this tests that a new knit index file has the expected content
315
 
        # and that is writes the data we expect as records are added.
316
 
        knit = self.make_test_knit(True)
317
 
        # Now knit files are not created until we first add data to them
318
 
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
319
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
320
 
        self.assertFileEqual(
321
 
            "# bzr knit index 8\n"
322
 
            "\n"
323
 
            "revid fulltext 0 84 .a_ghost :",
324
 
            'test.kndx')
325
 
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
326
 
        self.assertFileEqual(
327
 
            "# bzr knit index 8\n"
328
 
            "\nrevid fulltext 0 84 .a_ghost :"
329
 
            "\nrevid2 line-delta 84 82 0 :",
330
 
            'test.kndx')
331
 
        # we should be able to load this file again
332
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
333
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
334
 
        # write a short write to the file and ensure that its ignored
335
 
        indexfile = file('test.kndx', 'at')
336
 
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
337
 
        indexfile.close()
338
 
        # we should be able to load this file again
339
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
340
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
341
 
        # and add a revision with the same id the failed write had
342
 
        knit.add_lines('revid3', ['revid2'], ['a\n'])
343
 
        # and when reading it revid3 should now appear.
344
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
345
 
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
346
 
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
347
 
 
348
 
    def test_delay_create(self):
349
 
        """Test that passing delay_create=True creates files late"""
350
 
        knit = self.make_test_knit(annotate=True, delay_create=True)
351
 
        self.failIfExists('test.knit')
352
 
        self.failIfExists('test.kndx')
353
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
354
 
        self.failUnlessExists('test.knit')
355
 
        self.assertFileEqual(
356
 
            "# bzr knit index 8\n"
357
 
            "\n"
358
 
            "revid fulltext 0 84 .a_ghost :",
359
 
            'test.kndx')
360
 
 
361
 
    def test_create_parent_dir(self):
362
 
        """create_parent_dir can create knits in nonexistant dirs"""
363
 
        # Has no effect if we don't set 'delay_create'
364
 
        trans = get_transport('.')
365
 
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
366
 
                          trans, access_mode='w', factory=None,
367
 
                          create=True, create_parent_dir=True)
368
 
        # Nothing should have changed yet
369
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
370
 
                                 factory=None, create=True,
371
 
                                 create_parent_dir=True,
372
 
                                 delay_create=True)
373
 
        self.failIfExists('dir/test.knit')
374
 
        self.failIfExists('dir/test.kndx')
375
 
        self.failIfExists('dir')
376
 
        knit.add_lines('revid', [], ['a\n'])
377
 
        self.failUnlessExists('dir')
378
 
        self.failUnlessExists('dir/test.knit')
379
 
        self.assertFileEqual(
380
 
            "# bzr knit index 8\n"
381
 
            "\n"
382
 
            "revid fulltext 0 84  :",
383
 
            'dir/test.kndx')
384
 
 
385
 
    def test_create_mode_700(self):
386
 
        trans = get_transport('.')
387
 
        if not trans._can_roundtrip_unix_modebits():
388
 
            # Can't roundtrip, so no need to run this test
389
 
            return
390
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
391
 
                                 factory=None, create=True,
392
 
                                 create_parent_dir=True,
393
 
                                 delay_create=True,
394
 
                                 file_mode=0600,
395
 
                                 dir_mode=0700)
396
 
        knit.add_lines('revid', [], ['a\n'])
397
 
        self.assertTransportMode(trans, 'dir', 0700)
398
 
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
399
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
400
 
 
401
 
    def test_create_mode_770(self):
402
 
        trans = get_transport('.')
403
 
        if not trans._can_roundtrip_unix_modebits():
404
 
            # Can't roundtrip, so no need to run this test
405
 
            return
406
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
407
 
                                 factory=None, create=True,
408
 
                                 create_parent_dir=True,
409
 
                                 delay_create=True,
410
 
                                 file_mode=0660,
411
 
                                 dir_mode=0770)
412
 
        knit.add_lines('revid', [], ['a\n'])
413
 
        self.assertTransportMode(trans, 'dir', 0770)
414
 
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
415
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
416
 
 
417
 
    def test_create_mode_777(self):
418
 
        trans = get_transport('.')
419
 
        if not trans._can_roundtrip_unix_modebits():
420
 
            # Can't roundtrip, so no need to run this test
421
 
            return
422
 
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
423
 
                                 factory=None, create=True,
424
 
                                 create_parent_dir=True,
425
 
                                 delay_create=True,
426
 
                                 file_mode=0666,
427
 
                                 dir_mode=0777)
428
 
        knit.add_lines('revid', [], ['a\n'])
429
 
        self.assertTransportMode(trans, 'dir', 0777)
430
 
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
431
 
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
432
 
 
433
 
    def test_plan_merge(self):
434
 
        my_knit = self.make_test_knit(annotate=True)
435
 
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
436
 
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
437
 
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
438
 
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
439
 
        for plan_line, expected_line in zip(plan, AB_MERGE):
440
 
            self.assertEqual(plan_line, expected_line)
441
 
 
442
 
 
443
 
TEXT_1 = """\
444
 
Banana cup cakes:
445
 
 
446
 
- bananas
447
 
- eggs
448
 
- broken tea cups
449
 
"""
450
 
 
451
 
TEXT_1A = """\
452
 
Banana cup cake recipe
453
 
(serves 6)
454
 
 
455
 
- bananas
456
 
- eggs
457
 
- broken tea cups
458
 
- self-raising flour
459
 
"""
460
 
 
461
 
TEXT_1B = """\
462
 
Banana cup cake recipe
463
 
 
464
 
- bananas (do not use plantains!!!)
465
 
- broken tea cups
466
 
- flour
467
 
"""
468
 
 
469
 
delta_1_1a = """\
470
 
0,1,2
471
 
Banana cup cake recipe
472
 
(serves 6)
473
 
5,5,1
474
 
- self-raising flour
475
 
"""
476
 
 
477
 
TEXT_2 = """\
478
 
Boeuf bourguignon
479
 
 
480
 
- beef
481
 
- red wine
482
 
- small onions
483
 
- carrot
484
 
- mushrooms
485
 
"""
486
 
 
487
 
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
488
 
new-a|(serves 6)
489
 
unchanged|
490
 
killed-b|- bananas
491
 
killed-b|- eggs
492
 
new-b|- bananas (do not use plantains!!!)
493
 
unchanged|- broken tea cups
494
 
new-a|- self-raising flour
495
 
new-b|- flour
496
 
"""
497
 
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
498
 
 
499
 
 
500
 
def line_delta(from_lines, to_lines):
501
 
    """Generate line-based delta from one text to another"""
502
 
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
503
 
    for op in s.get_opcodes():
504
 
        if op[0] == 'equal':
505
 
            continue
506
 
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
507
 
        for i in range(op[3], op[4]):
508
 
            yield to_lines[i]
509
 
 
510
 
 
511
 
def apply_line_delta(basis_lines, delta_lines):
512
 
    """Apply a line-based perfect diff
513
 
    
514
 
    basis_lines -- text to apply the patch to
515
 
    delta_lines -- diff instructions and content
516
 
    """
517
 
    out = basis_lines[:]
518
 
    i = 0
519
 
    offset = 0
520
 
    while i < len(delta_lines):
521
 
        l = delta_lines[i]
522
 
        a, b, c = map(long, l.split(','))
523
 
        i = i + 1
524
 
        out[offset+a:offset+b] = delta_lines[i:i+c]
525
 
        i = i + c
526
 
        offset = offset + (b - a) + c
527
 
    return out
528
 
 
529
 
 
530
 
class TestWeaveToKnit(KnitTests):
531
 
 
532
 
    def test_weave_to_knit_matches(self):
533
 
        # check that the WeaveToKnit is_compatible function
534
 
        # registers True for a Weave to a Knit.
535
 
        w = Weave()
536
 
        k = self.make_test_knit()
537
 
        self.failUnless(WeaveToKnit.is_compatible(w, k))
538
 
        self.failIf(WeaveToKnit.is_compatible(k, w))
539
 
        self.failIf(WeaveToKnit.is_compatible(w, w))
540
 
        self.failIf(WeaveToKnit.is_compatible(k, k))
541
 
 
542
 
 
543
 
class TestKnitCaching(KnitTests):
544
 
    
545
 
    def create_knit(self, cache_add=False):
546
 
        k = self.make_test_knit(True)
547
 
        if cache_add:
548
 
            k.enable_cache()
549
 
 
550
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
551
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
552
 
        return k
553
 
 
554
 
    def test_no_caching(self):
555
 
        k = self.create_knit()
556
 
        # Nothing should be cached without setting 'enable_cache'
557
 
        self.assertEqual({}, k._data._cache)
558
 
 
559
 
    def test_cache_add_and_clear(self):
560
 
        k = self.create_knit(True)
561
 
 
562
 
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
563
 
 
564
 
        k.clear_cache()
565
 
        self.assertEqual({}, k._data._cache)
566
 
 
567
 
    def test_cache_data_read_raw(self):
568
 
        k = self.create_knit()
569
 
 
570
 
        # Now cache and read
571
 
        k.enable_cache()
572
 
 
573
 
        def read_one_raw(version):
574
 
            pos_map = k._get_components_positions([version])
575
 
            method, pos, size, next = pos_map[version]
576
 
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
577
 
            self.assertEqual(1, len(lst))
578
 
            return lst[0]
579
 
 
580
 
        val = read_one_raw('text-1')
581
 
        self.assertEqual({'text-1':val[1]}, k._data._cache)
582
 
 
583
 
        k.clear_cache()
584
 
        # After clear, new reads are not cached
585
 
        self.assertEqual({}, k._data._cache)
586
 
 
587
 
        val2 = read_one_raw('text-1')
588
 
        self.assertEqual(val, val2)
589
 
        self.assertEqual({}, k._data._cache)
590
 
 
591
 
    def test_cache_data_read(self):
592
 
        k = self.create_knit()
593
 
 
594
 
        def read_one(version):
595
 
            pos_map = k._get_components_positions([version])
596
 
            method, pos, size, next = pos_map[version]
597
 
            lst = list(k._data.read_records_iter([(version, pos, size)]))
598
 
            self.assertEqual(1, len(lst))
599
 
            return lst[0]
600
 
 
601
 
        # Now cache and read
602
 
        k.enable_cache()
603
 
 
604
 
        val = read_one('text-2')
605
 
        self.assertEqual(['text-2'], k._data._cache.keys())
606
 
        self.assertEqual('text-2', val[0])
607
 
        content, digest = k._data._parse_record('text-2',
608
 
                                                k._data._cache['text-2'])
609
 
        self.assertEqual(content, val[1])
610
 
        self.assertEqual(digest, val[2])
611
 
 
612
 
        k.clear_cache()
613
 
        self.assertEqual({}, k._data._cache)
614
 
 
615
 
        val2 = read_one('text-2')
616
 
        self.assertEqual(val, val2)
617
 
        self.assertEqual({}, k._data._cache)
618
 
 
619
 
    def test_cache_read(self):
620
 
        k = self.create_knit()
621
 
        k.enable_cache()
622
 
 
623
 
        text = k.get_text('text-1')
624
 
        self.assertEqual(TEXT_1, text)
625
 
        self.assertEqual(['text-1'], k._data._cache.keys())
626
 
 
627
 
        k.clear_cache()
628
 
        self.assertEqual({}, k._data._cache)
629
 
 
630
 
        text = k.get_text('text-1')
631
 
        self.assertEqual(TEXT_1, text)
632
 
        self.assertEqual({}, k._data._cache)