~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

  • Committer: Robert Collins
  • Date: 2006-03-28 14:29:13 UTC
  • mto: (1626.2.1 integration)
  • mto: This revision was merged to the branch mainline in revision 1628.
  • Revision ID: robertc@robertcollins.net-20060328142913-ac5afb37075719c6
Convert log to use the new tsort.merge_sort routine.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
 
22
22
 
23
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
24
 
from bzrlib.knit import (
25
 
    KnitVersionedFile,
26
 
    KnitPlainFactory,
27
 
    KnitAnnotateFactory,
28
 
    WeaveToKnit)
 
24
from bzrlib.knit import KnitVersionedFile, KnitPlainFactory, KnitAnnotateFactory
29
25
from bzrlib.osutils import split_lines
30
 
from bzrlib.tests import TestCaseWithTransport
31
 
from bzrlib.transport import TransportLogger, get_transport
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.transport import TransportLogger
 
28
from bzrlib.transport.local import LocalTransport
32
29
from bzrlib.transport.memory import MemoryTransport
33
 
from bzrlib.weave import Weave
34
 
 
35
 
 
36
 
class KnitTests(TestCaseWithTransport):
37
 
    """Class containing knit test helper routines."""
 
30
 
 
31
 
 
32
class KnitTests(TestCaseInTempDir):
 
33
 
 
34
    def add_stock_one_and_one_a(self, k):
 
35
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
36
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
37
 
 
38
    def test_knit_constructor(self):
 
39
        """Construct empty k"""
 
40
        self.make_test_knit()
38
41
 
39
42
    def make_test_knit(self, annotate=False):
40
43
        if not annotate:
41
44
            factory = KnitPlainFactory()
42
45
        else:
43
46
            factory = None
44
 
        return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
45
 
 
46
 
 
47
 
class BasicKnitTests(KnitTests):
48
 
 
49
 
    def add_stock_one_and_one_a(self, k):
50
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
51
 
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
52
 
 
53
 
    def test_knit_constructor(self):
54
 
        """Construct empty k"""
55
 
        self.make_test_knit()
 
47
        return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
56
48
 
57
49
    def test_knit_add(self):
58
50
        """Store one text in knit and retrieve"""
62
54
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
63
55
 
64
56
    def test_knit_reload(self):
65
 
        # test that the content in a reloaded knit is correct
 
57
        """Store and reload a knit"""
66
58
        k = self.make_test_knit()
67
59
        k.add_lines('text-1', [], split_lines(TEXT_1))
68
60
        del k
69
 
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
61
        k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
70
62
        self.assertTrue(k2.has_version('text-1'))
71
63
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
72
64
 
94
86
    def test_incomplete(self):
95
87
        """Test if texts without a ending line-end can be inserted and
96
88
        extracted."""
97
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
89
        k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
98
90
        k.add_lines('text-1', [], ['a\n',    'b'  ])
99
91
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
100
 
        # reopening ensures maximum room for confusion
101
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
102
92
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
103
93
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
104
94
 
126
116
 
127
117
    def test_add_delta(self):
128
118
        """Store in knit with parents"""
129
 
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
119
        k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
130
120
            delta=True, create=True)
131
121
        self.add_stock_one_and_one_a(k)
132
122
        k.clear_cache()
134
124
 
135
125
    def test_annotate(self):
136
126
        """Annotations"""
137
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
127
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
138
128
            delta=True, create=True)
139
129
        self.insert_and_test_small_annotate(k)
140
130
 
149
139
 
150
140
    def test_annotate_fulltext(self):
151
141
        """Annotations"""
152
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
142
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
153
143
            delta=False, create=True)
154
144
        self.insert_and_test_small_annotate(k)
155
145
 
228
218
 
229
219
    def test_knit_join(self):
230
220
        """Store in knit with parents"""
231
 
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
221
        k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
232
222
        k1.add_lines('text-a', [], split_lines(TEXT_1))
233
223
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
234
224
 
237
227
 
238
228
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
239
229
 
240
 
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
230
        k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
241
231
        count = k2.join(k1, version_ids=['text-m'])
242
232
        self.assertEquals(count, 5)
243
233
        self.assertTrue(k2.has_version('text-a'))
244
234
        self.assertTrue(k2.has_version('text-c'))
245
235
 
246
236
    def test_reannotate(self):
247
 
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
237
        k1 = KnitVersionedFile('knit1', LocalTransport('.'),
248
238
                               factory=KnitAnnotateFactory(), create=True)
249
239
        # 0
250
240
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
251
241
        # 1
252
242
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
253
243
 
254
 
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
244
        k2 = KnitVersionedFile('test2', LocalTransport('.'),
255
245
                               factory=KnitAnnotateFactory(), create=True)
256
246
        k2.join(k1, version_ids=['text-b'])
257
247
 
272
262
        self.assertEquals(origins[0], ('text-c', 'z\n'))
273
263
        self.assertEquals(origins[1], ('text-b', 'c\n'))
274
264
 
275
 
    def test_get_line_delta_texts(self):
276
 
        """Make sure we can call get_texts on text with reused line deltas"""
277
 
        k1 = KnitVersionedFile('test1', get_transport('.'), 
278
 
                               factory=KnitPlainFactory(), create=True)
279
 
        for t in range(3):
280
 
            if t == 0:
281
 
                parents = []
282
 
            else:
283
 
                parents = ['%d' % (t-1)]
284
 
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
285
 
        k1.get_texts(('%d' % t) for t in range(3))
 
265
    def test_extraction_reads_components_once(self):
 
266
        t = MemoryTransport()
 
267
        instrumented_t = TransportLogger(t)
 
268
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
 
269
        # should read the index
 
270
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
 
271
        instrumented_t._calls = []
 
272
        # add a text       
 
273
        k1.add_lines('base', [], ['text\n'])
 
274
        # should not have read at all
 
275
        self.assertEqual([], instrumented_t._calls)
 
276
 
 
277
        # add a text
 
278
        k1.add_lines('sub', ['base'], ['text\n', 'text2\n'])
 
279
        # should not have read at all
 
280
        self.assertEqual([], instrumented_t._calls)
 
281
        
 
282
        # read a text
 
283
        k1.get_lines('sub')
 
284
        # should not have read at all
 
285
        self.assertEqual([], instrumented_t._calls)
 
286
 
 
287
        # clear the cache
 
288
        k1.clear_cache()
 
289
 
 
290
        # read a text
 
291
        k1.get_lines('base')
 
292
        # should have read a component
 
293
        # should not have read the first component only
 
294
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
 
295
        instrumented_t._calls = []
 
296
        # read again
 
297
        k1.get_lines('base')
 
298
        # should not have read at all
 
299
        self.assertEqual([], instrumented_t._calls)
 
300
        # and now read the other component
 
301
        k1.get_lines('sub')
 
302
        # should have read the second component
 
303
        self.assertEqual([('id.knit', [(87, 93)])], instrumented_t._calls)
 
304
        instrumented_t._calls = []
 
305
 
 
306
        # clear the cache
 
307
        k1.clear_cache()
 
308
        # add a text cold 
 
309
        k1.add_lines('sub2', ['base'], ['text\n', 'text3\n'])
 
310
        # should read the first component only
 
311
        self.assertEqual([('id.knit', [(0, 87)])], instrumented_t._calls)
286
312
        
287
313
    def test_iter_lines_reads_in_order(self):
288
314
        t = MemoryTransport()
296
322
        instrumented_t._calls = []
297
323
        # request a last-first iteration
298
324
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
299
 
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
 
325
        self.assertEqual([('id.knit', [(0, 87), (87, 90)])], instrumented_t._calls)
300
326
        self.assertEqual(['text\n', 'text2\n'], results)
301
327
 
302
328
    def test_create_empty_annotated(self):
308
334
        self.assertEqual(k1.delta, k2.delta)
309
335
        # the generic test checks for empty content and file class
310
336
 
311
 
    def test_knit_format(self):
312
 
        # this tests that a new knit index file has the expected content
313
 
        # and that is writes the data we expect as records are added.
314
 
        knit = self.make_test_knit(True)
315
 
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
316
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
317
 
        self.assertFileEqual(
318
 
            "# bzr knit index 8\n"
319
 
            "\n"
320
 
            "revid fulltext 0 84 .a_ghost :",
321
 
            'test.kndx')
322
 
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
323
 
        self.assertFileEqual(
324
 
            "# bzr knit index 8\n"
325
 
            "\nrevid fulltext 0 84 .a_ghost :"
326
 
            "\nrevid2 line-delta 84 82 0 :",
327
 
            'test.kndx')
328
 
        # we should be able to load this file again
329
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
330
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
331
 
        # write a short write to the file and ensure that its ignored
332
 
        indexfile = file('test.kndx', 'at')
333
 
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
334
 
        indexfile.close()
335
 
        # we should be able to load this file again
336
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
337
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
338
 
        # and add a revision with the same id the failed write had
339
 
        knit.add_lines('revid3', ['revid2'], ['a\n'])
340
 
        # and when reading it revid3 should now appear.
341
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
342
 
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
343
 
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
344
 
 
345
 
    def test_plan_merge(self):
346
 
        my_knit = self.make_test_knit(annotate=True)
347
 
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
348
 
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
349
 
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
350
 
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
351
 
        for plan_line, expected_line in zip(plan, AB_MERGE):
352
 
            self.assertEqual(plan_line, expected_line)
353
 
 
354
337
 
355
338
TEXT_1 = """\
356
339
Banana cup cakes:
370
353
- self-raising flour
371
354
"""
372
355
 
373
 
TEXT_1B = """\
374
 
Banana cup cake recipe
375
 
 
376
 
- bananas (do not use plantains!!!)
377
 
- broken tea cups
378
 
- flour
379
 
"""
380
 
 
381
356
delta_1_1a = """\
382
357
0,1,2
383
358
Banana cup cake recipe
396
371
- mushrooms
397
372
"""
398
373
 
399
 
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
400
 
new-a|(serves 6)
401
 
unchanged|
402
 
killed-b|- bananas
403
 
killed-b|- eggs
404
 
new-b|- bananas (do not use plantains!!!)
405
 
unchanged|- broken tea cups
406
 
new-a|- self-raising flour
407
 
new-b|- flour
408
 
"""
409
 
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
410
 
 
411
 
 
412
374
def line_delta(from_lines, to_lines):
413
375
    """Generate line-based delta from one text to another"""
414
376
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
437
399
        i = i + c
438
400
        offset = offset + (b - a) + c
439
401
    return out
440
 
 
441
 
 
442
 
class TestWeaveToKnit(KnitTests):
443
 
 
444
 
    def test_weave_to_knit_matches(self):
445
 
        # check that the WeaveToKnit is_compatible function
446
 
        # registers True for a Weave to a Knit.
447
 
        w = Weave()
448
 
        k = self.make_test_knit()
449
 
        self.failUnless(WeaveToKnit.is_compatible(w, k))
450
 
        self.failIf(WeaveToKnit.is_compatible(k, w))
451
 
        self.failIf(WeaveToKnit.is_compatible(w, w))
452
 
        self.failIf(WeaveToKnit.is_compatible(k, k))