~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Aaron Bentley
  • Date: 2006-11-10 01:55:55 UTC
  • mto: This revision was merged to the branch mainline in revision 2127.
  • Revision ID: aaron.bentley@utoronto.ca-20061110015555-f48202744b630209
Ignore html docs (both kinds)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005, 2006 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
"""Tests for Knit data structure"""
 
18
 
 
19
 
 
20
import difflib
 
21
 
 
22
 
 
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent, NoSuchFile
 
24
from bzrlib.knit import (
 
25
    KnitVersionedFile,
 
26
    KnitPlainFactory,
 
27
    KnitAnnotateFactory,
 
28
    WeaveToKnit)
 
29
from bzrlib.osutils import split_lines
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.transport import TransportLogger, get_transport
 
32
from bzrlib.transport.memory import MemoryTransport
 
33
from bzrlib.weave import Weave
 
34
 
 
35
 
 
36
class KnitTests(TestCaseWithTransport):
 
37
    """Class containing knit test helper routines."""
 
38
 
 
39
    def make_test_knit(self, annotate=False, delay_create=False):
 
40
        if not annotate:
 
41
            factory = KnitPlainFactory()
 
42
        else:
 
43
            factory = None
 
44
        return KnitVersionedFile('test', get_transport('.'), access_mode='w',
 
45
                                 factory=factory, create=True,
 
46
                                 delay_create=delay_create)
 
47
 
 
48
 
 
49
class BasicKnitTests(KnitTests):
 
50
 
 
51
    def add_stock_one_and_one_a(self, k):
 
52
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
53
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
54
 
 
55
    def test_knit_constructor(self):
 
56
        """Construct empty k"""
 
57
        self.make_test_knit()
 
58
 
 
59
    def test_knit_add(self):
 
60
        """Store one text in knit and retrieve"""
 
61
        k = self.make_test_knit()
 
62
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
63
        self.assertTrue(k.has_version('text-1'))
 
64
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
65
 
 
66
    def test_knit_reload(self):
 
67
        # test that the content in a reloaded knit is correct
 
68
        k = self.make_test_knit()
 
69
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
70
        del k
 
71
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
72
        self.assertTrue(k2.has_version('text-1'))
 
73
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
 
74
 
 
75
    def test_knit_several(self):
 
76
        """Store several texts in a knit"""
 
77
        k = self.make_test_knit()
 
78
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
79
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
80
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
 
81
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
 
82
        
 
83
    def test_repeated_add(self):
 
84
        """Knit traps attempt to replace existing version"""
 
85
        k = self.make_test_knit()
 
86
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
87
        self.assertRaises(RevisionAlreadyPresent, 
 
88
                k.add_lines,
 
89
                'text-1', [], split_lines(TEXT_1))
 
90
 
 
91
    def test_empty(self):
 
92
        k = self.make_test_knit(True)
 
93
        k.add_lines('text-1', [], [])
 
94
        self.assertEquals(k.get_lines('text-1'), [])
 
95
 
 
96
    def test_incomplete(self):
 
97
        """Test if texts without a ending line-end can be inserted and
 
98
        extracted."""
 
99
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
100
        k.add_lines('text-1', [], ['a\n',    'b'  ])
 
101
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
 
102
        # reopening ensures maximum room for confusion
 
103
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
104
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
 
105
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
 
106
 
 
107
    def test_delta(self):
 
108
        """Expression of knit delta as lines"""
 
109
        k = self.make_test_knit()
 
110
        td = list(line_delta(TEXT_1.splitlines(True),
 
111
                             TEXT_1A.splitlines(True)))
 
112
        self.assertEqualDiff(''.join(td), delta_1_1a)
 
113
        out = apply_line_delta(TEXT_1.splitlines(True), td)
 
114
        self.assertEqualDiff(''.join(out), TEXT_1A)
 
115
 
 
116
    def test_add_with_parents(self):
 
117
        """Store in knit with parents"""
 
118
        k = self.make_test_knit()
 
119
        self.add_stock_one_and_one_a(k)
 
120
        self.assertEquals(k.get_parents('text-1'), [])
 
121
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
 
122
 
 
123
    def test_ancestry(self):
 
124
        """Store in knit with parents"""
 
125
        k = self.make_test_knit()
 
126
        self.add_stock_one_and_one_a(k)
 
127
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
 
128
 
 
129
    def test_add_delta(self):
 
130
        """Store in knit with parents"""
 
131
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
132
            delta=True, create=True)
 
133
        self.add_stock_one_and_one_a(k)
 
134
        k.clear_cache()
 
135
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
 
136
 
 
137
    def test_annotate(self):
 
138
        """Annotations"""
 
139
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
140
            delta=True, create=True)
 
141
        self.insert_and_test_small_annotate(k)
 
142
 
 
143
    def insert_and_test_small_annotate(self, k):
 
144
        """test annotation with k works correctly."""
 
145
        k.add_lines('text-1', [], ['a\n', 'b\n'])
 
146
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
 
147
 
 
148
        origins = k.annotate('text-2')
 
149
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
150
        self.assertEquals(origins[1], ('text-2', 'c\n'))
 
151
 
 
152
    def test_annotate_fulltext(self):
 
153
        """Annotations"""
 
154
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
155
            delta=False, create=True)
 
156
        self.insert_and_test_small_annotate(k)
 
157
 
 
158
    def test_annotate_merge_1(self):
 
159
        k = self.make_test_knit(True)
 
160
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
 
161
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
 
162
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
 
163
        origins = k.annotate('text-am')
 
164
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
 
165
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
166
 
 
167
    def test_annotate_merge_2(self):
 
168
        k = self.make_test_knit(True)
 
169
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
170
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
171
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
 
172
        origins = k.annotate('text-am')
 
173
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
174
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
175
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
176
 
 
177
    def test_annotate_merge_9(self):
 
178
        k = self.make_test_knit(True)
 
179
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
180
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
181
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
 
182
        origins = k.annotate('text-am')
 
183
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
184
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
185
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
 
186
 
 
187
    def test_annotate_merge_3(self):
 
188
        k = self.make_test_knit(True)
 
189
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
190
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
 
191
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
 
192
        origins = k.annotate('text-am')
 
193
        self.assertEquals(origins[0], ('text-am', 'k\n'))
 
194
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
 
195
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
196
 
 
197
    def test_annotate_merge_4(self):
 
198
        k = self.make_test_knit(True)
 
199
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
200
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
 
201
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
 
202
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
 
203
        origins = k.annotate('text-am')
 
204
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
205
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
 
206
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
 
207
 
 
208
    def test_annotate_merge_5(self):
 
209
        k = self.make_test_knit(True)
 
210
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
 
211
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
 
212
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
 
213
        k.add_lines('text-am',
 
214
                    ['text-a1', 'text-a2', 'text-a3'],
 
215
                    ['a\n', 'e\n', 'z\n'])
 
216
        origins = k.annotate('text-am')
 
217
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
 
218
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
 
219
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
 
220
 
 
221
    def test_annotate_file_cherry_pick(self):
 
222
        k = self.make_test_knit(True)
 
223
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
 
224
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
 
225
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
 
226
        origins = k.annotate('text-3')
 
227
        self.assertEquals(origins[0], ('text-1', 'a\n'))
 
228
        self.assertEquals(origins[1], ('text-1', 'b\n'))
 
229
        self.assertEquals(origins[2], ('text-1', 'c\n'))
 
230
 
 
231
    def test_knit_join(self):
 
232
        """Store in knit with parents"""
 
233
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
234
        k1.add_lines('text-a', [], split_lines(TEXT_1))
 
235
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
 
236
 
 
237
        k1.add_lines('text-c', [], split_lines(TEXT_1))
 
238
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
 
239
 
 
240
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
 
241
 
 
242
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
243
        count = k2.join(k1, version_ids=['text-m'])
 
244
        self.assertEquals(count, 5)
 
245
        self.assertTrue(k2.has_version('text-a'))
 
246
        self.assertTrue(k2.has_version('text-c'))
 
247
 
 
248
    def test_reannotate(self):
 
249
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
250
                               factory=KnitAnnotateFactory(), create=True)
 
251
        # 0
 
252
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
253
        # 1
 
254
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
 
255
 
 
256
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
257
                               factory=KnitAnnotateFactory(), create=True)
 
258
        k2.join(k1, version_ids=['text-b'])
 
259
 
 
260
        # 2
 
261
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
 
262
        # 2
 
263
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
 
264
        # 3
 
265
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
 
266
 
 
267
        # test-c will have index 3
 
268
        k1.join(k2, version_ids=['text-c'])
 
269
 
 
270
        lines = k1.get_lines('text-c')
 
271
        self.assertEquals(lines, ['z\n', 'c\n'])
 
272
 
 
273
        origins = k1.annotate('text-c')
 
274
        self.assertEquals(origins[0], ('text-c', 'z\n'))
 
275
        self.assertEquals(origins[1], ('text-b', 'c\n'))
 
276
 
 
277
    def test_get_line_delta_texts(self):
 
278
        """Make sure we can call get_texts on text with reused line deltas"""
 
279
        k1 = KnitVersionedFile('test1', get_transport('.'), 
 
280
                               factory=KnitPlainFactory(), create=True)
 
281
        for t in range(3):
 
282
            if t == 0:
 
283
                parents = []
 
284
            else:
 
285
                parents = ['%d' % (t-1)]
 
286
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
 
287
        k1.get_texts(('%d' % t) for t in range(3))
 
288
        
 
289
    def test_iter_lines_reads_in_order(self):
 
290
        t = MemoryTransport()
 
291
        instrumented_t = TransportLogger(t)
 
292
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
293
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
294
        # add texts with no required ordering
 
295
        k1.add_lines('base', [], ['text\n'])
 
296
        k1.add_lines('base2', [], ['text2\n'])
 
297
        k1.clear_cache()
 
298
        instrumented_t._calls = []
 
299
        # request a last-first iteration
 
300
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
 
301
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
302
        self.assertEqual(['text\n', 'text2\n'], results)
 
303
 
 
304
    def test_create_empty_annotated(self):
 
305
        k1 = self.make_test_knit(True)
 
306
        # 0
 
307
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
 
308
        k2 = k1.create_empty('t', MemoryTransport())
 
309
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
 
310
        self.assertEqual(k1.delta, k2.delta)
 
311
        # the generic test checks for empty content and file class
 
312
 
 
313
    def test_knit_format(self):
 
314
        # this tests that a new knit index file has the expected content
 
315
        # and that is writes the data we expect as records are added.
 
316
        knit = self.make_test_knit(True)
 
317
        # Now knit files are not created until we first add data to them
 
318
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
 
319
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
320
        self.assertFileEqual(
 
321
            "# bzr knit index 8\n"
 
322
            "\n"
 
323
            "revid fulltext 0 84 .a_ghost :",
 
324
            'test.kndx')
 
325
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
 
326
        self.assertFileEqual(
 
327
            "# bzr knit index 8\n"
 
328
            "\nrevid fulltext 0 84 .a_ghost :"
 
329
            "\nrevid2 line-delta 84 82 0 :",
 
330
            'test.kndx')
 
331
        # we should be able to load this file again
 
332
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
333
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
334
        # write a short write to the file and ensure that its ignored
 
335
        indexfile = file('test.kndx', 'at')
 
336
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
 
337
        indexfile.close()
 
338
        # we should be able to load this file again
 
339
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
 
340
        self.assertEqual(['revid', 'revid2'], knit.versions())
 
341
        # and add a revision with the same id the failed write had
 
342
        knit.add_lines('revid3', ['revid2'], ['a\n'])
 
343
        # and when reading it revid3 should now appear.
 
344
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
 
345
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
 
346
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
 
347
 
 
348
    def test_delay_create(self):
 
349
        """Test that passing delay_create=True creates files late"""
 
350
        knit = self.make_test_knit(annotate=True, delay_create=True)
 
351
        self.failIfExists('test.knit')
 
352
        self.failIfExists('test.kndx')
 
353
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
 
354
        self.failUnlessExists('test.knit')
 
355
        self.assertFileEqual(
 
356
            "# bzr knit index 8\n"
 
357
            "\n"
 
358
            "revid fulltext 0 84 .a_ghost :",
 
359
            'test.kndx')
 
360
 
 
361
    def test_create_parent_dir(self):
 
362
        """create_parent_dir can create knits in nonexistant dirs"""
 
363
        # Has no effect if we don't set 'delay_create'
 
364
        trans = get_transport('.')
 
365
        self.assertRaises(NoSuchFile, KnitVersionedFile, 'dir/test',
 
366
                          trans, access_mode='w', factory=None,
 
367
                          create=True, create_parent_dir=True)
 
368
        # Nothing should have changed yet
 
369
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
370
                                 factory=None, create=True,
 
371
                                 create_parent_dir=True,
 
372
                                 delay_create=True)
 
373
        self.failIfExists('dir/test.knit')
 
374
        self.failIfExists('dir/test.kndx')
 
375
        self.failIfExists('dir')
 
376
        knit.add_lines('revid', [], ['a\n'])
 
377
        self.failUnlessExists('dir')
 
378
        self.failUnlessExists('dir/test.knit')
 
379
        self.assertFileEqual(
 
380
            "# bzr knit index 8\n"
 
381
            "\n"
 
382
            "revid fulltext 0 84  :",
 
383
            'dir/test.kndx')
 
384
 
 
385
    def test_create_mode_700(self):
 
386
        trans = get_transport('.')
 
387
        if not trans._can_roundtrip_unix_modebits():
 
388
            # Can't roundtrip, so no need to run this test
 
389
            return
 
390
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
391
                                 factory=None, create=True,
 
392
                                 create_parent_dir=True,
 
393
                                 delay_create=True,
 
394
                                 file_mode=0600,
 
395
                                 dir_mode=0700)
 
396
        knit.add_lines('revid', [], ['a\n'])
 
397
        self.assertTransportMode(trans, 'dir', 0700)
 
398
        self.assertTransportMode(trans, 'dir/test.knit', 0600)
 
399
        self.assertTransportMode(trans, 'dir/test.kndx', 0600)
 
400
 
 
401
    def test_create_mode_770(self):
 
402
        trans = get_transport('.')
 
403
        if not trans._can_roundtrip_unix_modebits():
 
404
            # Can't roundtrip, so no need to run this test
 
405
            return
 
406
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
407
                                 factory=None, create=True,
 
408
                                 create_parent_dir=True,
 
409
                                 delay_create=True,
 
410
                                 file_mode=0660,
 
411
                                 dir_mode=0770)
 
412
        knit.add_lines('revid', [], ['a\n'])
 
413
        self.assertTransportMode(trans, 'dir', 0770)
 
414
        self.assertTransportMode(trans, 'dir/test.knit', 0660)
 
415
        self.assertTransportMode(trans, 'dir/test.kndx', 0660)
 
416
 
 
417
    def test_create_mode_777(self):
 
418
        trans = get_transport('.')
 
419
        if not trans._can_roundtrip_unix_modebits():
 
420
            # Can't roundtrip, so no need to run this test
 
421
            return
 
422
        knit = KnitVersionedFile('dir/test', trans, access_mode='w',
 
423
                                 factory=None, create=True,
 
424
                                 create_parent_dir=True,
 
425
                                 delay_create=True,
 
426
                                 file_mode=0666,
 
427
                                 dir_mode=0777)
 
428
        knit.add_lines('revid', [], ['a\n'])
 
429
        self.assertTransportMode(trans, 'dir', 0777)
 
430
        self.assertTransportMode(trans, 'dir/test.knit', 0666)
 
431
        self.assertTransportMode(trans, 'dir/test.kndx', 0666)
 
432
 
 
433
    def test_plan_merge(self):
 
434
        my_knit = self.make_test_knit(annotate=True)
 
435
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
 
436
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
 
437
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
 
438
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
 
439
        for plan_line, expected_line in zip(plan, AB_MERGE):
 
440
            self.assertEqual(plan_line, expected_line)
 
441
 
 
442
 
 
443
TEXT_1 = """\
 
444
Banana cup cakes:
 
445
 
 
446
- bananas
 
447
- eggs
 
448
- broken tea cups
 
449
"""
 
450
 
 
451
TEXT_1A = """\
 
452
Banana cup cake recipe
 
453
(serves 6)
 
454
 
 
455
- bananas
 
456
- eggs
 
457
- broken tea cups
 
458
- self-raising flour
 
459
"""
 
460
 
 
461
TEXT_1B = """\
 
462
Banana cup cake recipe
 
463
 
 
464
- bananas (do not use plantains!!!)
 
465
- broken tea cups
 
466
- flour
 
467
"""
 
468
 
 
469
delta_1_1a = """\
 
470
0,1,2
 
471
Banana cup cake recipe
 
472
(serves 6)
 
473
5,5,1
 
474
- self-raising flour
 
475
"""
 
476
 
 
477
TEXT_2 = """\
 
478
Boeuf bourguignon
 
479
 
 
480
- beef
 
481
- red wine
 
482
- small onions
 
483
- carrot
 
484
- mushrooms
 
485
"""
 
486
 
 
487
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
 
488
new-a|(serves 6)
 
489
unchanged|
 
490
killed-b|- bananas
 
491
killed-b|- eggs
 
492
new-b|- bananas (do not use plantains!!!)
 
493
unchanged|- broken tea cups
 
494
new-a|- self-raising flour
 
495
new-b|- flour
 
496
"""
 
497
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
 
498
 
 
499
 
 
500
def line_delta(from_lines, to_lines):
 
501
    """Generate line-based delta from one text to another"""
 
502
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
 
503
    for op in s.get_opcodes():
 
504
        if op[0] == 'equal':
 
505
            continue
 
506
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
 
507
        for i in range(op[3], op[4]):
 
508
            yield to_lines[i]
 
509
 
 
510
 
 
511
def apply_line_delta(basis_lines, delta_lines):
 
512
    """Apply a line-based perfect diff
 
513
    
 
514
    basis_lines -- text to apply the patch to
 
515
    delta_lines -- diff instructions and content
 
516
    """
 
517
    out = basis_lines[:]
 
518
    i = 0
 
519
    offset = 0
 
520
    while i < len(delta_lines):
 
521
        l = delta_lines[i]
 
522
        a, b, c = map(long, l.split(','))
 
523
        i = i + 1
 
524
        out[offset+a:offset+b] = delta_lines[i:i+c]
 
525
        i = i + c
 
526
        offset = offset + (b - a) + c
 
527
    return out
 
528
 
 
529
 
 
530
class TestWeaveToKnit(KnitTests):
 
531
 
 
532
    def test_weave_to_knit_matches(self):
 
533
        # check that the WeaveToKnit is_compatible function
 
534
        # registers True for a Weave to a Knit.
 
535
        w = Weave()
 
536
        k = self.make_test_knit()
 
537
        self.failUnless(WeaveToKnit.is_compatible(w, k))
 
538
        self.failIf(WeaveToKnit.is_compatible(k, w))
 
539
        self.failIf(WeaveToKnit.is_compatible(w, w))
 
540
        self.failIf(WeaveToKnit.is_compatible(k, k))
 
541
 
 
542
 
 
543
class TestKnitCaching(KnitTests):
 
544
    
 
545
    def create_knit(self, cache_add=False):
 
546
        k = self.make_test_knit(True)
 
547
        if cache_add:
 
548
            k.enable_cache()
 
549
 
 
550
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
551
        k.add_lines('text-2', [], split_lines(TEXT_2))
 
552
        return k
 
553
 
 
554
    def test_no_caching(self):
 
555
        k = self.create_knit()
 
556
        # Nothing should be cached without setting 'enable_cache'
 
557
        self.assertEqual({}, k._data._cache)
 
558
 
 
559
    def test_cache_add_and_clear(self):
 
560
        k = self.create_knit(True)
 
561
 
 
562
        self.assertEqual(['text-1', 'text-2'], sorted(k._data._cache.keys()))
 
563
 
 
564
        k.clear_cache()
 
565
        self.assertEqual({}, k._data._cache)
 
566
 
 
567
    def test_cache_data_read_raw(self):
 
568
        k = self.create_knit()
 
569
 
 
570
        # Now cache and read
 
571
        k.enable_cache()
 
572
 
 
573
        def read_one_raw(version):
 
574
            pos_map = k._get_components_positions([version])
 
575
            method, pos, size, next = pos_map[version]
 
576
            lst = list(k._data.read_records_iter_raw([(version, pos, size)]))
 
577
            self.assertEqual(1, len(lst))
 
578
            return lst[0]
 
579
 
 
580
        val = read_one_raw('text-1')
 
581
        self.assertEqual({'text-1':val[1]}, k._data._cache)
 
582
 
 
583
        k.clear_cache()
 
584
        # After clear, new reads are not cached
 
585
        self.assertEqual({}, k._data._cache)
 
586
 
 
587
        val2 = read_one_raw('text-1')
 
588
        self.assertEqual(val, val2)
 
589
        self.assertEqual({}, k._data._cache)
 
590
 
 
591
    def test_cache_data_read(self):
 
592
        k = self.create_knit()
 
593
 
 
594
        def read_one(version):
 
595
            pos_map = k._get_components_positions([version])
 
596
            method, pos, size, next = pos_map[version]
 
597
            lst = list(k._data.read_records_iter([(version, pos, size)]))
 
598
            self.assertEqual(1, len(lst))
 
599
            return lst[0]
 
600
 
 
601
        # Now cache and read
 
602
        k.enable_cache()
 
603
 
 
604
        val = read_one('text-2')
 
605
        self.assertEqual(['text-2'], k._data._cache.keys())
 
606
        self.assertEqual('text-2', val[0])
 
607
        content, digest = k._data._parse_record('text-2',
 
608
                                                k._data._cache['text-2'])
 
609
        self.assertEqual(content, val[1])
 
610
        self.assertEqual(digest, val[2])
 
611
 
 
612
        k.clear_cache()
 
613
        self.assertEqual({}, k._data._cache)
 
614
 
 
615
        val2 = read_one('text-2')
 
616
        self.assertEqual(val, val2)
 
617
        self.assertEqual({}, k._data._cache)
 
618
 
 
619
    def test_cache_read(self):
 
620
        k = self.create_knit()
 
621
        k.enable_cache()
 
622
 
 
623
        text = k.get_text('text-1')
 
624
        self.assertEqual(TEXT_1, text)
 
625
        self.assertEqual(['text-1'], k._data._cache.keys())
 
626
 
 
627
        k.clear_cache()
 
628
        self.assertEqual({}, k._data._cache)
 
629
 
 
630
        text = k.get_text('text-1')
 
631
        self.assertEqual(TEXT_1, text)
 
632
        self.assertEqual({}, k._data._cache)
 
633
 
 
634
 
 
635
class TestKnitIndex(KnitTests):
 
636
 
 
637
    def test_add_versions_dictionary_compresses(self):
 
638
        """Adding versions to the index should update the lookup dict"""
 
639
        knit = self.make_test_knit()
 
640
        idx = knit._index
 
641
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
642
        self.check_file_contents('test.kndx',
 
643
            '# bzr knit index 8\n'
 
644
            '\n'
 
645
            'a-1 fulltext 0 0  :'
 
646
            )
 
647
        idx.add_versions([('a-2', ['fulltext'], 0, 0, ['a-1']),
 
648
                          ('a-3', ['fulltext'], 0, 0, ['a-2']),
 
649
                         ])
 
650
        self.check_file_contents('test.kndx',
 
651
            '# bzr knit index 8\n'
 
652
            '\n'
 
653
            'a-1 fulltext 0 0  :\n'
 
654
            'a-2 fulltext 0 0 0 :\n'
 
655
            'a-3 fulltext 0 0 1 :'
 
656
            )
 
657
        self.assertEqual(['a-1', 'a-2', 'a-3'], idx._history)
 
658
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0),
 
659
                          'a-2':('a-2', ['fulltext'], 0, 0, ['a-1'], 1),
 
660
                          'a-3':('a-3', ['fulltext'], 0, 0, ['a-2'], 2),
 
661
                         }, idx._cache)
 
662
 
 
663
    def test_add_versions_fails_clean(self):
 
664
        """If add_versions fails in the middle, it restores a pristine state.
 
665
 
 
666
        Any modifications that are made to the index are reset if all versions
 
667
        cannot be added.
 
668
        """
 
669
        # This cheats a little bit by passing in a generator which will
 
670
        # raise an exception before the processing finishes
 
671
        # Other possibilities would be to have an version with the wrong number
 
672
        # of entries, or to make the backing transport unable to write any
 
673
        # files.
 
674
 
 
675
        knit = self.make_test_knit()
 
676
        idx = knit._index
 
677
        idx.add_version('a-1', ['fulltext'], 0, 0, [])
 
678
 
 
679
        class StopEarly(Exception):
 
680
            pass
 
681
 
 
682
        def generate_failure():
 
683
            """Add some entries and then raise an exception"""
 
684
            yield ('a-2', ['fulltext'], 0, 0, ['a-1'])
 
685
            yield ('a-3', ['fulltext'], 0, 0, ['a-2'])
 
686
            raise StopEarly()
 
687
 
 
688
        # Assert the pre-condition
 
689
        self.assertEqual(['a-1'], idx._history)
 
690
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)
 
691
 
 
692
        self.assertRaises(StopEarly, idx.add_versions, generate_failure())
 
693
 
 
694
        # And it shouldn't be modified
 
695
        self.assertEqual(['a-1'], idx._history)
 
696
        self.assertEqual({'a-1':('a-1', ['fulltext'], 0, 0, [], 0)}, idx._cache)