~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Martin Pool
  • Date: 2005-08-12 15:41:44 UTC
  • Revision ID: mbp@sourcefrog.net-20050812154144-bc98570a78b8f633
- merge in deferred revfile work

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005, 2006 by Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
"""Tests for Knit data structure"""
18
 
 
19
 
 
20
 
import difflib
21
 
 
22
 
 
23
 
from bzrlib.errors import KnitError, RevisionAlreadyPresent
24
 
from bzrlib.knit import KnitVersionedFile, KnitPlainFactory, KnitAnnotateFactory
25
 
from bzrlib.osutils import split_lines
26
 
from bzrlib.tests import TestCaseInTempDir
27
 
from bzrlib.transport import TransportLogger
28
 
from bzrlib.transport.local import LocalTransport
29
 
from bzrlib.transport.memory import MemoryTransport
30
 
 
31
 
 
32
 
class KnitTests(TestCaseInTempDir):
33
 
 
34
 
    def add_stock_one_and_one_a(self, k):
35
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
36
 
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
37
 
 
38
 
    def test_knit_constructor(self):
39
 
        """Construct empty k"""
40
 
        self.make_test_knit()
41
 
 
42
 
    def make_test_knit(self, annotate=False):
43
 
        if not annotate:
44
 
            factory = KnitPlainFactory()
45
 
        else:
46
 
            factory = None
47
 
        return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
48
 
 
49
 
    def test_knit_add(self):
50
 
        """Store one text in knit and retrieve"""
51
 
        k = self.make_test_knit()
52
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
53
 
        self.assertTrue(k.has_version('text-1'))
54
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
55
 
 
56
 
    def test_knit_reload(self):
57
 
        # test that the content in a reloaded knit is correct
58
 
        k = self.make_test_knit()
59
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
60
 
        del k
61
 
        k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
62
 
        self.assertTrue(k2.has_version('text-1'))
63
 
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
64
 
 
65
 
    def test_knit_several(self):
66
 
        """Store several texts in a knit"""
67
 
        k = self.make_test_knit()
68
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
69
 
        k.add_lines('text-2', [], split_lines(TEXT_2))
70
 
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
71
 
        self.assertEqualDiff(''.join(k.get_lines('text-2')), TEXT_2)
72
 
        
73
 
    def test_repeated_add(self):
74
 
        """Knit traps attempt to replace existing version"""
75
 
        k = self.make_test_knit()
76
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
77
 
        self.assertRaises(RevisionAlreadyPresent, 
78
 
                k.add_lines,
79
 
                'text-1', [], split_lines(TEXT_1))
80
 
 
81
 
    def test_empty(self):
82
 
        k = self.make_test_knit(True)
83
 
        k.add_lines('text-1', [], [])
84
 
        self.assertEquals(k.get_lines('text-1'), [])
85
 
 
86
 
    def test_incomplete(self):
87
 
        """Test if texts without a ending line-end can be inserted and
88
 
        extracted."""
89
 
        k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
90
 
        k.add_lines('text-1', [], ['a\n',    'b'  ])
91
 
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
92
 
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
93
 
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
94
 
 
95
 
    def test_delta(self):
96
 
        """Expression of knit delta as lines"""
97
 
        k = self.make_test_knit()
98
 
        td = list(line_delta(TEXT_1.splitlines(True),
99
 
                             TEXT_1A.splitlines(True)))
100
 
        self.assertEqualDiff(''.join(td), delta_1_1a)
101
 
        out = apply_line_delta(TEXT_1.splitlines(True), td)
102
 
        self.assertEqualDiff(''.join(out), TEXT_1A)
103
 
 
104
 
    def test_add_with_parents(self):
105
 
        """Store in knit with parents"""
106
 
        k = self.make_test_knit()
107
 
        self.add_stock_one_and_one_a(k)
108
 
        self.assertEquals(k.get_parents('text-1'), [])
109
 
        self.assertEquals(k.get_parents('text-1a'), ['text-1'])
110
 
 
111
 
    def test_ancestry(self):
112
 
        """Store in knit with parents"""
113
 
        k = self.make_test_knit()
114
 
        self.add_stock_one_and_one_a(k)
115
 
        self.assertEquals(set(k.get_ancestry(['text-1a'])), set(['text-1a', 'text-1']))
116
 
 
117
 
    def test_add_delta(self):
118
 
        """Store in knit with parents"""
119
 
        k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
120
 
            delta=True, create=True)
121
 
        self.add_stock_one_and_one_a(k)
122
 
        k.clear_cache()
123
 
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
124
 
 
125
 
    def test_annotate(self):
126
 
        """Annotations"""
127
 
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
128
 
            delta=True, create=True)
129
 
        self.insert_and_test_small_annotate(k)
130
 
 
131
 
    def insert_and_test_small_annotate(self, k):
132
 
        """test annotation with k works correctly."""
133
 
        k.add_lines('text-1', [], ['a\n', 'b\n'])
134
 
        k.add_lines('text-2', ['text-1'], ['a\n', 'c\n'])
135
 
 
136
 
        origins = k.annotate('text-2')
137
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
138
 
        self.assertEquals(origins[1], ('text-2', 'c\n'))
139
 
 
140
 
    def test_annotate_fulltext(self):
141
 
        """Annotations"""
142
 
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
143
 
            delta=False, create=True)
144
 
        self.insert_and_test_small_annotate(k)
145
 
 
146
 
    def test_annotate_merge_1(self):
147
 
        k = self.make_test_knit(True)
148
 
        k.add_lines('text-a1', [], ['a\n', 'b\n'])
149
 
        k.add_lines('text-a2', [], ['d\n', 'c\n'])
150
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['d\n', 'b\n'])
151
 
        origins = k.annotate('text-am')
152
 
        self.assertEquals(origins[0], ('text-a2', 'd\n'))
153
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
154
 
 
155
 
    def test_annotate_merge_2(self):
156
 
        k = self.make_test_knit(True)
157
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
158
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
159
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['a\n', 'y\n', 'c\n'])
160
 
        origins = k.annotate('text-am')
161
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
162
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
163
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
164
 
 
165
 
    def test_annotate_merge_9(self):
166
 
        k = self.make_test_knit(True)
167
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
168
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
169
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'c\n'])
170
 
        origins = k.annotate('text-am')
171
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
172
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
173
 
        self.assertEquals(origins[2], ('text-a1', 'c\n'))
174
 
 
175
 
    def test_annotate_merge_3(self):
176
 
        k = self.make_test_knit(True)
177
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
178
 
        k.add_lines('text-a2', [] ,['x\n', 'y\n', 'z\n'])
179
 
        k.add_lines('text-am', ['text-a1', 'text-a2'], ['k\n', 'y\n', 'z\n'])
180
 
        origins = k.annotate('text-am')
181
 
        self.assertEquals(origins[0], ('text-am', 'k\n'))
182
 
        self.assertEquals(origins[1], ('text-a2', 'y\n'))
183
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
184
 
 
185
 
    def test_annotate_merge_4(self):
186
 
        k = self.make_test_knit(True)
187
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
188
 
        k.add_lines('text-a2', [], ['x\n', 'y\n', 'z\n'])
189
 
        k.add_lines('text-a3', ['text-a1'], ['a\n', 'b\n', 'p\n'])
190
 
        k.add_lines('text-am', ['text-a2', 'text-a3'], ['a\n', 'b\n', 'z\n'])
191
 
        origins = k.annotate('text-am')
192
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
193
 
        self.assertEquals(origins[1], ('text-a1', 'b\n'))
194
 
        self.assertEquals(origins[2], ('text-a2', 'z\n'))
195
 
 
196
 
    def test_annotate_merge_5(self):
197
 
        k = self.make_test_knit(True)
198
 
        k.add_lines('text-a1', [], ['a\n', 'b\n', 'c\n'])
199
 
        k.add_lines('text-a2', [], ['d\n', 'e\n', 'f\n'])
200
 
        k.add_lines('text-a3', [], ['x\n', 'y\n', 'z\n'])
201
 
        k.add_lines('text-am',
202
 
                    ['text-a1', 'text-a2', 'text-a3'],
203
 
                    ['a\n', 'e\n', 'z\n'])
204
 
        origins = k.annotate('text-am')
205
 
        self.assertEquals(origins[0], ('text-a1', 'a\n'))
206
 
        self.assertEquals(origins[1], ('text-a2', 'e\n'))
207
 
        self.assertEquals(origins[2], ('text-a3', 'z\n'))
208
 
 
209
 
    def test_annotate_file_cherry_pick(self):
210
 
        k = self.make_test_knit(True)
211
 
        k.add_lines('text-1', [], ['a\n', 'b\n', 'c\n'])
212
 
        k.add_lines('text-2', ['text-1'], ['d\n', 'e\n', 'f\n'])
213
 
        k.add_lines('text-3', ['text-2', 'text-1'], ['a\n', 'b\n', 'c\n'])
214
 
        origins = k.annotate('text-3')
215
 
        self.assertEquals(origins[0], ('text-1', 'a\n'))
216
 
        self.assertEquals(origins[1], ('text-1', 'b\n'))
217
 
        self.assertEquals(origins[2], ('text-1', 'c\n'))
218
 
 
219
 
    def test_knit_join(self):
220
 
        """Store in knit with parents"""
221
 
        k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
222
 
        k1.add_lines('text-a', [], split_lines(TEXT_1))
223
 
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
224
 
 
225
 
        k1.add_lines('text-c', [], split_lines(TEXT_1))
226
 
        k1.add_lines('text-d', ['text-c'], split_lines(TEXT_1))
227
 
 
228
 
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
229
 
 
230
 
        k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
231
 
        count = k2.join(k1, version_ids=['text-m'])
232
 
        self.assertEquals(count, 5)
233
 
        self.assertTrue(k2.has_version('text-a'))
234
 
        self.assertTrue(k2.has_version('text-c'))
235
 
 
236
 
    def test_reannotate(self):
237
 
        k1 = KnitVersionedFile('knit1', LocalTransport('.'),
238
 
                               factory=KnitAnnotateFactory(), create=True)
239
 
        # 0
240
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
241
 
        # 1
242
 
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
243
 
 
244
 
        k2 = KnitVersionedFile('test2', LocalTransport('.'),
245
 
                               factory=KnitAnnotateFactory(), create=True)
246
 
        k2.join(k1, version_ids=['text-b'])
247
 
 
248
 
        # 2
249
 
        k1.add_lines('text-X', ['text-b'], ['a\n', 'b\n'])
250
 
        # 2
251
 
        k2.add_lines('text-c', ['text-b'], ['z\n', 'c\n'])
252
 
        # 3
253
 
        k2.add_lines('text-Y', ['text-b'], ['b\n', 'c\n'])
254
 
 
255
 
        # test-c will have index 3
256
 
        k1.join(k2, version_ids=['text-c'])
257
 
 
258
 
        lines = k1.get_lines('text-c')
259
 
        self.assertEquals(lines, ['z\n', 'c\n'])
260
 
 
261
 
        origins = k1.annotate('text-c')
262
 
        self.assertEquals(origins[0], ('text-c', 'z\n'))
263
 
        self.assertEquals(origins[1], ('text-b', 'c\n'))
264
 
 
265
 
    def test_extraction_reads_components_once(self):
266
 
        t = MemoryTransport()
267
 
        instrumented_t = TransportLogger(t)
268
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
269
 
        # should read the index
270
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
271
 
        instrumented_t._calls = []
272
 
        # add a text       
273
 
        k1.add_lines('base', [], ['text\n'])
274
 
        # should not have read at all
275
 
        self.assertEqual([], instrumented_t._calls)
276
 
 
277
 
        # add a text
278
 
        k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
279
 
        # should not have read at all
280
 
        self.assertEqual([], instrumented_t._calls)
281
 
        
282
 
        # read a text
283
 
        k1.get_lines('sub')
284
 
        # should not have read at all
285
 
        self.assertEqual([], instrumented_t._calls)
286
 
 
287
 
        # clear the cache
288
 
        k1.clear_cache()
289
 
 
290
 
        # read a text
291
 
        k1.get_lines('base')
292
 
        # should have read a component
293
 
        # should not have read the first component only
294
 
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
295
 
        instrumented_t._calls = []
296
 
        # read again
297
 
        k1.get_lines('base')
298
 
        # should not have read at all
299
 
        self.assertEqual([], instrumented_t._calls)
300
 
        # and now read the other component
301
 
        k1.get_lines('sub')
302
 
        # should have read the second component
303
 
        self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
304
 
        instrumented_t._calls = []
305
 
 
306
 
        # clear the cache
307
 
        k1.clear_cache()
308
 
        # add a text cold 
309
 
        k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
310
 
        # should read the first component only
311
 
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
312
 
        
313
 
    def test_iter_lines_reads_in_order(self):
314
 
        t = MemoryTransport()
315
 
        instrumented_t = TransportLogger(t)
316
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
317
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
318
 
        # add texts with no required ordering
319
 
        k1.add_lines('base', [], ['text\n'])
320
 
        k1.add_lines('base2', [], ['text2\n'])
321
 
        k1.clear_cache()
322
 
        instrumented_t._calls = []
323
 
        # request a last-first iteration
324
 
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
325
 
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
326
 
        self.assertEqual(['text\n', 'text2\n'], results)
327
 
 
328
 
    def test_create_empty_annotated(self):
329
 
        k1 = self.make_test_knit(True)
330
 
        # 0
331
 
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
332
 
        k2 = k1.create_empty('t', MemoryTransport())
333
 
        self.assertTrue(isinstance(k2.factory, KnitAnnotateFactory))
334
 
        self.assertEqual(k1.delta, k2.delta)
335
 
        # the generic test checks for empty content and file class
336
 
 
337
 
    def test_knit_format(self):
338
 
        # this tests that a new knit index file has the expected content
339
 
        # and that is writes the data we expect as records are added.
340
 
        knit = self.make_test_knit(True)
341
 
        self.assertFileEqual("# bzr knit index 7\n", 'test.kndx')
342
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
343
 
        self.assertFileEqual(
344
 
            "# bzr knit index 7\n"
345
 
            "\n"
346
 
            "revid fulltext 0 84 .a_ghost :",
347
 
            'test.kndx')
348
 
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
349
 
        self.assertFileEqual(
350
 
            "# bzr knit index 7\n"
351
 
            "\nrevid fulltext 0 84 .a_ghost :"
352
 
            "\nrevid2 line-delta 84 82 0 :",
353
 
            'test.kndx')
354
 
        # we should be able to load this file again
355
 
        knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
356
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
357
 
        # write a short write to the file and ensure that its ignored
358
 
        indexfile = file('test.kndx', 'at')
359
 
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
360
 
        indexfile.close()
361
 
        # we should be able to load this file again
362
 
        knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='w')
363
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
364
 
        # and add a revision with the same id the failed write had
365
 
        knit.add_lines('revid3', ['revid2'], ['a\n'])
366
 
        # and when reading it revid3 should now appear.
367
 
        knit = KnitVersionedFile('test', LocalTransport('.'), access_mode='r')
368
 
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
369
 
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
370
 
 
371
 
 
372
 
 
373
 
TEXT_1 = """\
374
 
Banana cup cakes:
375
 
 
376
 
- bananas
377
 
- eggs
378
 
- broken tea cups
379
 
"""
380
 
 
381
 
TEXT_1A = """\
382
 
Banana cup cake recipe
383
 
(serves 6)
384
 
 
385
 
- bananas
386
 
- eggs
387
 
- broken tea cups
388
 
- self-raising flour
389
 
"""
390
 
 
391
 
delta_1_1a = """\
392
 
0,1,2
393
 
Banana cup cake recipe
394
 
(serves 6)
395
 
5,5,1
396
 
- self-raising flour
397
 
"""
398
 
 
399
 
TEXT_2 = """\
400
 
Boeuf bourguignon
401
 
 
402
 
- beef
403
 
- red wine
404
 
- small onions
405
 
- carrot
406
 
- mushrooms
407
 
"""
408
 
 
409
 
def line_delta(from_lines, to_lines):
410
 
    """Generate line-based delta from one text to another"""
411
 
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
412
 
    for op in s.get_opcodes():
413
 
        if op[0] == 'equal':
414
 
            continue
415
 
        yield '%d,%d,%d\n' % (op[1], op[2], op[4]-op[3])
416
 
        for i in range(op[3], op[4]):
417
 
            yield to_lines[i]
418
 
 
419
 
 
420
 
def apply_line_delta(basis_lines, delta_lines):
421
 
    """Apply a line-based perfect diff
422
 
    
423
 
    basis_lines -- text to apply the patch to
424
 
    delta_lines -- diff instructions and content
425
 
    """
426
 
    out = basis_lines[:]
427
 
    i = 0
428
 
    offset = 0
429
 
    while i < len(delta_lines):
430
 
        l = delta_lines[i]
431
 
        a, b, c = map(long, l.split(','))
432
 
        i = i + 1
433
 
        out[offset+a:offset+b] = delta_lines[i:i+c]
434
 
        i = i + c
435
 
        offset = offset + (b - a) + c
436
 
    return out