~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_knit.py

MergeĀ inĀ upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
21
21
 
22
22
 
23
23
from bzrlib.errors import KnitError, RevisionAlreadyPresent
24
 
from bzrlib.knit import (
25
 
    KnitVersionedFile,
26
 
    KnitPlainFactory,
27
 
    KnitAnnotateFactory,
28
 
    WeaveToKnit)
 
24
from bzrlib.knit import KnitVersionedFile, KnitPlainFactory, KnitAnnotateFactory
29
25
from bzrlib.osutils import split_lines
30
 
from bzrlib.tests import TestCaseWithTransport
31
 
from bzrlib.transport import TransportLogger, get_transport
 
26
from bzrlib.tests import TestCaseInTempDir
 
27
from bzrlib.transport.local import LocalTransport
32
28
from bzrlib.transport.memory import MemoryTransport
33
 
from bzrlib.weave import Weave
34
 
 
35
 
 
36
 
class KnitTests(TestCaseWithTransport):
37
 
    """Class containing knit test helper routines."""
 
29
from bzrlib.transactions import PassThroughTransaction
 
30
 
 
31
 
 
32
class KnitTests(TestCaseInTempDir):
 
33
 
 
34
    def add_stock_one_and_one_a(self, k):
 
35
        k.add_lines('text-1', [], split_lines(TEXT_1))
 
36
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
 
37
 
 
38
    def test_knit_constructor(self):
 
39
        """Construct empty k"""
 
40
        self.make_test_knit()
38
41
 
39
42
    def make_test_knit(self, annotate=False):
40
43
        if not annotate:
41
44
            factory = KnitPlainFactory()
42
45
        else:
43
46
            factory = None
44
 
        return KnitVersionedFile('test', get_transport('.'), access_mode='w', factory=factory, create=True)
45
 
 
46
 
 
47
 
class BasicKnitTests(KnitTests):
48
 
 
49
 
    def add_stock_one_and_one_a(self, k):
50
 
        k.add_lines('text-1', [], split_lines(TEXT_1))
51
 
        k.add_lines('text-1a', ['text-1'], split_lines(TEXT_1A))
52
 
 
53
 
    def test_knit_constructor(self):
54
 
        """Construct empty k"""
55
 
        self.make_test_knit()
 
47
        return KnitVersionedFile('test', LocalTransport('.'), access_mode='w', factory=factory, create=True)
56
48
 
57
49
    def test_knit_add(self):
58
50
        """Store one text in knit and retrieve"""
62
54
        self.assertEqualDiff(''.join(k.get_lines('text-1')), TEXT_1)
63
55
 
64
56
    def test_knit_reload(self):
65
 
        # test that the content in a reloaded knit is correct
 
57
        """Store and reload a knit"""
66
58
        k = self.make_test_knit()
67
59
        k.add_lines('text-1', [], split_lines(TEXT_1))
68
60
        del k
69
 
        k2 = KnitVersionedFile('test', get_transport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
 
61
        k2 = KnitVersionedFile('test', LocalTransport('.'), access_mode='r', factory=KnitPlainFactory(), create=True)
70
62
        self.assertTrue(k2.has_version('text-1'))
71
63
        self.assertEqualDiff(''.join(k2.get_lines('text-1')), TEXT_1)
72
64
 
94
86
    def test_incomplete(self):
95
87
        """Test if texts without a ending line-end can be inserted and
96
88
        extracted."""
97
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
 
89
        k = KnitVersionedFile('test', LocalTransport('.'), delta=False, create=True)
98
90
        k.add_lines('text-1', [], ['a\n',    'b'  ])
99
91
        k.add_lines('text-2', ['text-1'], ['a\rb\n', 'b\n'])
100
 
        # reopening ensures maximum room for confusion
101
 
        k = KnitVersionedFile('test', get_transport('.'), delta=False, create=True)
102
92
        self.assertEquals(k.get_lines('text-1'), ['a\n',    'b'  ])
103
93
        self.assertEquals(k.get_lines('text-2'), ['a\rb\n', 'b\n'])
104
94
 
126
116
 
127
117
    def test_add_delta(self):
128
118
        """Store in knit with parents"""
129
 
        k = KnitVersionedFile('test', get_transport('.'), factory=KnitPlainFactory(),
 
119
        k = KnitVersionedFile('test', LocalTransport('.'), factory=KnitPlainFactory(),
130
120
            delta=True, create=True)
131
121
        self.add_stock_one_and_one_a(k)
132
 
        k.clear_cache()
133
122
        self.assertEqualDiff(''.join(k.get_lines('text-1a')), TEXT_1A)
134
123
 
135
124
    def test_annotate(self):
136
125
        """Annotations"""
137
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
126
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
138
127
            delta=True, create=True)
139
128
        self.insert_and_test_small_annotate(k)
140
129
 
149
138
 
150
139
    def test_annotate_fulltext(self):
151
140
        """Annotations"""
152
 
        k = KnitVersionedFile('knit', get_transport('.'), factory=KnitAnnotateFactory(),
 
141
        k = KnitVersionedFile('knit', LocalTransport('.'), factory=KnitAnnotateFactory(),
153
142
            delta=False, create=True)
154
143
        self.insert_and_test_small_annotate(k)
155
144
 
228
217
 
229
218
    def test_knit_join(self):
230
219
        """Store in knit with parents"""
231
 
        k1 = KnitVersionedFile('test1', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
220
        k1 = KnitVersionedFile('test1', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
232
221
        k1.add_lines('text-a', [], split_lines(TEXT_1))
233
222
        k1.add_lines('text-b', ['text-a'], split_lines(TEXT_1))
234
223
 
237
226
 
238
227
        k1.add_lines('text-m', ['text-b', 'text-d'], split_lines(TEXT_1))
239
228
 
240
 
        k2 = KnitVersionedFile('test2', get_transport('.'), factory=KnitPlainFactory(), create=True)
 
229
        k2 = KnitVersionedFile('test2', LocalTransport('.'), factory=KnitPlainFactory(), create=True)
241
230
        count = k2.join(k1, version_ids=['text-m'])
242
231
        self.assertEquals(count, 5)
243
232
        self.assertTrue(k2.has_version('text-a'))
244
233
        self.assertTrue(k2.has_version('text-c'))
245
234
 
246
235
    def test_reannotate(self):
247
 
        k1 = KnitVersionedFile('knit1', get_transport('.'),
 
236
        k1 = KnitVersionedFile('knit1', LocalTransport('.'),
248
237
                               factory=KnitAnnotateFactory(), create=True)
249
238
        # 0
250
239
        k1.add_lines('text-a', [], ['a\n', 'b\n'])
251
240
        # 1
252
241
        k1.add_lines('text-b', ['text-a'], ['a\n', 'c\n'])
253
242
 
254
 
        k2 = KnitVersionedFile('test2', get_transport('.'),
 
243
        k2 = KnitVersionedFile('test2', LocalTransport('.'),
255
244
                               factory=KnitAnnotateFactory(), create=True)
256
245
        k2.join(k1, version_ids=['text-b'])
257
246
 
269
258
        self.assertEquals(lines, ['z\n', 'c\n'])
270
259
 
271
260
        origins = k1.annotate('text-c')
272
 
        self.assertEquals(origins[0], ('text-c', 'z\n'))
273
 
        self.assertEquals(origins[1], ('text-b', 'c\n'))
274
 
 
275
 
    def test_get_line_delta_texts(self):
276
 
        """Make sure we can call get_texts on text with reused line deltas"""
277
 
        k1 = KnitVersionedFile('test1', get_transport('.'), 
278
 
                               factory=KnitPlainFactory(), create=True)
279
 
        for t in range(3):
280
 
            if t == 0:
281
 
                parents = []
282
 
            else:
283
 
                parents = ['%d' % (t-1)]
284
 
            k1.add_lines('%d' % t, parents, ['hello\n'] * t)
285
 
        k1.get_texts(('%d' % t) for t in range(3))
286
 
        
287
 
    def test_iter_lines_reads_in_order(self):
288
 
        t = MemoryTransport()
289
 
        instrumented_t = TransportLogger(t)
290
 
        k1 = KnitVersionedFile('id', instrumented_t, create=True, delta=True)
291
 
        self.assertEqual([('id.kndx',)], instrumented_t._calls)
292
 
        # add texts with no required ordering
293
 
        k1.add_lines('base', [], ['text\n'])
294
 
        k1.add_lines('base2', [], ['text2\n'])
295
 
        k1.clear_cache()
296
 
        instrumented_t._calls = []
297
 
        # request a last-first iteration
298
 
        results = list(k1.iter_lines_added_or_present_in_versions(['base2', 'base']))
299
 
        self.assertEqual([('id.knit', [(0, 87), (87, 89)])], instrumented_t._calls)
300
 
        self.assertEqual(['text\n', 'text2\n'], results)
 
261
        self.assertEquals(origins[0], ('text-c', 'z\n')) 
 
262
        self.assertEquals(origins[1], ('text-b', 'c\n')) 
301
263
 
302
264
    def test_create_empty_annotated(self):
303
265
        k1 = self.make_test_knit(True)
308
270
        self.assertEqual(k1.delta, k2.delta)
309
271
        # the generic test checks for empty content and file class
310
272
 
311
 
    def test_knit_format(self):
312
 
        # this tests that a new knit index file has the expected content
313
 
        # and that is writes the data we expect as records are added.
314
 
        knit = self.make_test_knit(True)
315
 
        self.assertFileEqual("# bzr knit index 8\n", 'test.kndx')
316
 
        knit.add_lines_with_ghosts('revid', ['a_ghost'], ['a\n'])
317
 
        self.assertFileEqual(
318
 
            "# bzr knit index 8\n"
319
 
            "\n"
320
 
            "revid fulltext 0 84 .a_ghost :",
321
 
            'test.kndx')
322
 
        knit.add_lines_with_ghosts('revid2', ['revid'], ['a\n'])
323
 
        self.assertFileEqual(
324
 
            "# bzr knit index 8\n"
325
 
            "\nrevid fulltext 0 84 .a_ghost :"
326
 
            "\nrevid2 line-delta 84 82 0 :",
327
 
            'test.kndx')
328
 
        # we should be able to load this file again
329
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
330
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
331
 
        # write a short write to the file and ensure that its ignored
332
 
        indexfile = file('test.kndx', 'at')
333
 
        indexfile.write('\nrevid3 line-delta 166 82 1 2 3 4 5 .phwoar:demo ')
334
 
        indexfile.close()
335
 
        # we should be able to load this file again
336
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='w')
337
 
        self.assertEqual(['revid', 'revid2'], knit.versions())
338
 
        # and add a revision with the same id the failed write had
339
 
        knit.add_lines('revid3', ['revid2'], ['a\n'])
340
 
        # and when reading it revid3 should now appear.
341
 
        knit = KnitVersionedFile('test', get_transport('.'), access_mode='r')
342
 
        self.assertEqual(['revid', 'revid2', 'revid3'], knit.versions())
343
 
        self.assertEqual(['revid2'], knit.get_parents('revid3'))
344
 
 
345
 
    def test_plan_merge(self):
346
 
        my_knit = self.make_test_knit(annotate=True)
347
 
        my_knit.add_lines('text1', [], split_lines(TEXT_1))
348
 
        my_knit.add_lines('text1a', ['text1'], split_lines(TEXT_1A))
349
 
        my_knit.add_lines('text1b', ['text1'], split_lines(TEXT_1B))
350
 
        plan = list(my_knit.plan_merge('text1a', 'text1b'))
351
 
        for plan_line, expected_line in zip(plan, AB_MERGE):
352
 
            self.assertEqual(plan_line, expected_line)
353
 
 
354
273
 
355
274
TEXT_1 = """\
356
275
Banana cup cakes:
370
289
- self-raising flour
371
290
"""
372
291
 
373
 
TEXT_1B = """\
374
 
Banana cup cake recipe
375
 
 
376
 
- bananas (do not use plantains!!!)
377
 
- broken tea cups
378
 
- flour
379
 
"""
380
 
 
381
292
delta_1_1a = """\
382
293
0,1,2
383
294
Banana cup cake recipe
396
307
- mushrooms
397
308
"""
398
309
 
399
 
AB_MERGE_TEXT="""unchanged|Banana cup cake recipe
400
 
new-a|(serves 6)
401
 
unchanged|
402
 
killed-b|- bananas
403
 
killed-b|- eggs
404
 
new-b|- bananas (do not use plantains!!!)
405
 
unchanged|- broken tea cups
406
 
new-a|- self-raising flour
407
 
new-b|- flour
408
 
"""
409
 
AB_MERGE=[tuple(l.split('|')) for l in AB_MERGE_TEXT.splitlines(True)]
410
 
 
411
 
 
412
310
def line_delta(from_lines, to_lines):
413
311
    """Generate line-based delta from one text to another"""
414
312
    s = difflib.SequenceMatcher(None, from_lines, to_lines)
437
335
        i = i + c
438
336
        offset = offset + (b - a) + c
439
337
    return out
440
 
 
441
 
 
442
 
class TestWeaveToKnit(KnitTests):
443
 
 
444
 
    def test_weave_to_knit_matches(self):
445
 
        # check that the WeaveToKnit is_compatible function
446
 
        # registers True for a Weave to a Knit.
447
 
        w = Weave()
448
 
        k = self.make_test_knit()
449
 
        self.failUnless(WeaveToKnit.is_compatible(w, k))
450
 
        self.failIf(WeaveToKnit.is_compatible(k, w))
451
 
        self.failIf(WeaveToKnit.is_compatible(w, w))
452
 
        self.failIf(WeaveToKnit.is_compatible(k, k))