~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

[merge] Erik Bågfors: add --revision to bzr pull

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/python2.4
 
2
 
 
3
# Copyright (C) 2005 by Canonical Ltd
 
4
 
 
5
# This program is free software; you can redistribute it and/or modify
 
6
# it under the terms of the GNU General Public License as published by
 
7
# the Free Software Foundation; either version 2 of the License, or
 
8
# (at your option) any later version.
 
9
 
 
10
# This program is distributed in the hope that it will be useful,
 
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
# GNU General Public License for more details.
 
14
 
 
15
# You should have received a copy of the GNU General Public License
 
16
# along with this program; if not, write to the Free Software
 
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
 
 
19
 
 
20
# TODO: tests regarding version names
 
21
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
22
#       if it fails.
 
23
 
 
24
"""test suite for weave algorithm"""
 
25
 
 
26
from pprint import pformat
 
27
 
 
28
import bzrlib.errors as errors
 
29
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
 
30
from bzrlib.weavefile import write_weave, read_weave
 
31
from bzrlib.tests import TestCase
 
32
from bzrlib.osutils import sha_string
 
33
 
 
34
 
 
35
# texts for use in testing
 
36
TEXT_0 = ["Hello world"]
 
37
TEXT_1 = ["Hello world",
 
38
          "A second line"]
 
39
 
 
40
 
 
41
class TestBase(TestCase):
 
42
    def check_read_write(self, k):
 
43
        """Check the weave k can be written & re-read."""
 
44
        from tempfile import TemporaryFile
 
45
        tf = TemporaryFile()
 
46
 
 
47
        write_weave(k, tf)
 
48
        tf.seek(0)
 
49
        k2 = read_weave(tf)
 
50
 
 
51
        if k != k2:
 
52
            tf.seek(0)
 
53
            self.log('serialized weave:')
 
54
            self.log(tf.read())
 
55
 
 
56
            self.log('')
 
57
            self.log('parents: %s' % (k._parents == k2._parents))
 
58
            self.log('         %r' % k._parents)
 
59
            self.log('         %r' % k2._parents)
 
60
            self.log('')
 
61
            self.fail('read/write check failed')
 
62
 
 
63
 
 
64
class WeaveContains(TestBase):
 
65
    """Weave __contains__ operator"""
 
66
    def runTest(self):
 
67
        k = Weave()
 
68
        self.assertFalse('foo' in k)
 
69
        k.add('foo', [], TEXT_1)
 
70
        self.assertTrue('foo' in k)
 
71
 
 
72
 
 
73
class Easy(TestBase):
 
74
    def runTest(self):
 
75
        k = Weave()
 
76
 
 
77
 
 
78
class StoreText(TestBase):
 
79
    """Store and retrieve a simple text."""
 
80
    def runTest(self):
 
81
        k = Weave()
 
82
        idx = k.add('text0', [], TEXT_0)
 
83
        self.assertEqual(k.get(idx), TEXT_0)
 
84
        self.assertEqual(idx, 0)
 
85
 
 
86
 
 
87
class AnnotateOne(TestBase):
 
88
    def runTest(self):
 
89
        k = Weave()
 
90
        k.add('text0', [], TEXT_0)
 
91
        self.assertEqual(k.annotate(0),
 
92
                         [(0, TEXT_0[0])])
 
93
 
 
94
 
 
95
class StoreTwo(TestBase):
 
96
    def runTest(self):
 
97
        k = Weave()
 
98
 
 
99
        idx = k.add('text0', [], TEXT_0)
 
100
        self.assertEqual(idx, 0)
 
101
 
 
102
        idx = k.add('text1', [], TEXT_1)
 
103
        self.assertEqual(idx, 1)
 
104
 
 
105
        self.assertEqual(k.get(0), TEXT_0)
 
106
        self.assertEqual(k.get(1), TEXT_1)
 
107
 
 
108
 
 
109
class AddWithGivenSha(TestBase):
 
110
    def runTest(self):
 
111
        """Add with caller-supplied SHA-1"""
 
112
        k = Weave()
 
113
 
 
114
        t = 'text0'
 
115
        k.add('text0', [], [t], sha1=sha_string(t))
 
116
 
 
117
 
 
118
class GetSha1(TestBase):
 
119
    def test_get_sha1(self):
 
120
        k = Weave()
 
121
        k.add('text0', [], 'text0')
 
122
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
123
                         k.get_sha1('text0'))
 
124
        self.assertRaises(errors.WeaveRevisionNotPresent,
 
125
                          k.get_sha1, 0)
 
126
        self.assertRaises(errors.WeaveRevisionNotPresent,
 
127
                          k.get_sha1, 'text1')
 
128
                        
 
129
 
 
130
class InvalidAdd(TestBase):
 
131
    """Try to use invalid version number during add."""
 
132
    def runTest(self):
 
133
        k = Weave()
 
134
 
 
135
        self.assertRaises(IndexError,
 
136
                          k.add,
 
137
                          'text0',
 
138
                          [69],
 
139
                          ['new text!'])
 
140
 
 
141
 
 
142
class RepeatedAdd(TestBase):
 
143
    """Add the same version twice; harmless."""
 
144
    def runTest(self):
 
145
        k = Weave()
 
146
        idx = k.add('text0', [], TEXT_0)
 
147
        idx2 = k.add('text0', [], TEXT_0)
 
148
        self.assertEqual(idx, idx2)
 
149
 
 
150
 
 
151
class InvalidRepeatedAdd(TestBase):
 
152
    def runTest(self):
 
153
        k = Weave()
 
154
        idx = k.add('text0', [], TEXT_0)
 
155
        self.assertRaises(WeaveError,
 
156
                          k.add,
 
157
                          'text0',
 
158
                          [],
 
159
                          ['not the same text'])
 
160
        self.assertRaises(WeaveError,
 
161
                          k.add,
 
162
                          'text0',
 
163
                          [12],         # not the right parents
 
164
                          TEXT_0)
 
165
        
 
166
 
 
167
class InsertLines(TestBase):
 
168
    """Store a revision that adds one line to the original.
 
169
 
 
170
    Look at the annotations to make sure that the first line is matched
 
171
    and not stored repeatedly."""
 
172
    def runTest(self):
 
173
        k = Weave()
 
174
 
 
175
        k.add('text0', [], ['line 1'])
 
176
        k.add('text1', [0], ['line 1', 'line 2'])
 
177
 
 
178
        self.assertEqual(k.annotate(0),
 
179
                         [(0, 'line 1')])
 
180
 
 
181
        self.assertEqual(k.get(1),
 
182
                         ['line 1',
 
183
                          'line 2'])
 
184
 
 
185
        self.assertEqual(k.annotate(1),
 
186
                         [(0, 'line 1'),
 
187
                          (1, 'line 2')])
 
188
 
 
189
        k.add('text2', [0], ['line 1', 'diverged line'])
 
190
 
 
191
        self.assertEqual(k.annotate(2),
 
192
                         [(0, 'line 1'),
 
193
                          (2, 'diverged line')])
 
194
 
 
195
        text3 = ['line 1', 'middle line', 'line 2']
 
196
        k.add('text3',
 
197
              [0, 1],
 
198
              text3)
 
199
 
 
200
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
201
 
 
202
        self.log("k._weave=" + pformat(k._weave))
 
203
 
 
204
        self.assertEqual(k.annotate(3),
 
205
                         [(0, 'line 1'),
 
206
                          (3, 'middle line'),
 
207
                          (1, 'line 2')])
 
208
 
 
209
        # now multiple insertions at different places
 
210
        k.add('text4',
 
211
              [0, 1, 3],
 
212
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
213
 
 
214
        self.assertEqual(k.annotate(4), 
 
215
                         [(0, 'line 1'),
 
216
                          (4, 'aaa'),
 
217
                          (3, 'middle line'),
 
218
                          (4, 'bbb'),
 
219
                          (1, 'line 2'),
 
220
                          (4, 'ccc')])
 
221
 
 
222
 
 
223
class DeleteLines(TestBase):
 
224
    """Deletion of lines from existing text.
 
225
 
 
226
    Try various texts all based on a common ancestor."""
 
227
    def runTest(self):
 
228
        k = Weave()
 
229
 
 
230
        base_text = ['one', 'two', 'three', 'four']
 
231
 
 
232
        k.add('text0', [], base_text)
 
233
        
 
234
        texts = [['one', 'two', 'three'],
 
235
                 ['two', 'three', 'four'],
 
236
                 ['one', 'four'],
 
237
                 ['one', 'two', 'three', 'four'],
 
238
                 ]
 
239
 
 
240
        i = 1
 
241
        for t in texts:
 
242
            ver = k.add('text%d' % i,
 
243
                        [0], t)
 
244
            i += 1
 
245
 
 
246
        self.log('final weave:')
 
247
        self.log('k._weave=' + pformat(k._weave))
 
248
 
 
249
        for i in range(len(texts)):
 
250
            self.assertEqual(k.get(i+1),
 
251
                             texts[i])
 
252
 
 
253
 
 
254
class SuicideDelete(TestBase):
 
255
    """Invalid weave which tries to add and delete simultaneously."""
 
256
    def runTest(self):
 
257
        k = Weave()
 
258
 
 
259
        k._parents = [(),
 
260
                ]
 
261
        k._weave = [('{', 0),
 
262
                'first line',
 
263
                ('[', 0),
 
264
                'deleted in 0',
 
265
                (']', 0),
 
266
                ('}', 0),
 
267
                ]
 
268
        ################################### SKIPPED
 
269
        # Weave.get doesn't trap this anymore
 
270
        return 
 
271
 
 
272
        self.assertRaises(WeaveFormatError,
 
273
                          k.get,
 
274
                          0)        
 
275
 
 
276
 
 
277
class CannedDelete(TestBase):
 
278
    """Unpack canned weave with deleted lines."""
 
279
    def runTest(self):
 
280
        k = Weave()
 
281
 
 
282
        k._parents = [(),
 
283
                frozenset([0]),
 
284
                ]
 
285
        k._weave = [('{', 0),
 
286
                'first line',
 
287
                ('[', 1),
 
288
                'line to be deleted',
 
289
                (']', 1),
 
290
                'last line',
 
291
                ('}', 0),
 
292
                ]
 
293
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
294
                  , sha_string('first linelast line')]
 
295
 
 
296
        self.assertEqual(k.get(0),
 
297
                         ['first line',
 
298
                          'line to be deleted',
 
299
                          'last line',
 
300
                          ])
 
301
 
 
302
        self.assertEqual(k.get(1),
 
303
                         ['first line',
 
304
                          'last line',
 
305
                          ])
 
306
 
 
307
 
 
308
class CannedReplacement(TestBase):
 
309
    """Unpack canned weave with deleted lines."""
 
310
    def runTest(self):
 
311
        k = Weave()
 
312
 
 
313
        k._parents = [frozenset(),
 
314
                frozenset([0]),
 
315
                ]
 
316
        k._weave = [('{', 0),
 
317
                'first line',
 
318
                ('[', 1),
 
319
                'line to be deleted',
 
320
                (']', 1),
 
321
                ('{', 1),
 
322
                'replacement line',                
 
323
                ('}', 1),
 
324
                'last line',
 
325
                ('}', 0),
 
326
                ]
 
327
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
328
                  , sha_string('first linereplacement linelast line')]
 
329
 
 
330
        self.assertEqual(k.get(0),
 
331
                         ['first line',
 
332
                          'line to be deleted',
 
333
                          'last line',
 
334
                          ])
 
335
 
 
336
        self.assertEqual(k.get(1),
 
337
                         ['first line',
 
338
                          'replacement line',
 
339
                          'last line',
 
340
                          ])
 
341
 
 
342
 
 
343
class BadWeave(TestBase):
 
344
    """Test that we trap an insert which should not occur."""
 
345
    def runTest(self):
 
346
        k = Weave()
 
347
 
 
348
        k._parents = [frozenset(),
 
349
                ]
 
350
        k._weave = ['bad line',
 
351
                ('{', 0),
 
352
                'foo {',
 
353
                ('{', 1),
 
354
                '  added in version 1',
 
355
                ('{', 2),
 
356
                '  added in v2',
 
357
                ('}', 2),
 
358
                '  also from v1',
 
359
                ('}', 1),
 
360
                '}',
 
361
                ('}', 0)]
 
362
 
 
363
        ################################### SKIPPED
 
364
        # Weave.get doesn't trap this anymore
 
365
        return 
 
366
 
 
367
 
 
368
        self.assertRaises(WeaveFormatError,
 
369
                          k.get,
 
370
                          0)
 
371
 
 
372
 
 
373
class BadInsert(TestBase):
 
374
    """Test that we trap an insert which should not occur."""
 
375
    def runTest(self):
 
376
        k = Weave()
 
377
 
 
378
        k._parents = [frozenset(),
 
379
                frozenset([0]),
 
380
                frozenset([0]),
 
381
                frozenset([0,1,2]),
 
382
                ]
 
383
        k._weave = [('{', 0),
 
384
                'foo {',
 
385
                ('{', 1),
 
386
                '  added in version 1',
 
387
                ('{', 1),
 
388
                '  more in 1',
 
389
                ('}', 1),
 
390
                ('}', 1),
 
391
                ('}', 0)]
 
392
 
 
393
 
 
394
        # this is not currently enforced by get
 
395
        return  ##########################################
 
396
 
 
397
        self.assertRaises(WeaveFormatError,
 
398
                          k.get,
 
399
                          0)
 
400
 
 
401
        self.assertRaises(WeaveFormatError,
 
402
                          k.get,
 
403
                          1)
 
404
 
 
405
 
 
406
class InsertNested(TestBase):
 
407
    """Insertion with nested instructions."""
 
408
    def runTest(self):
 
409
        k = Weave()
 
410
 
 
411
        k._parents = [frozenset(),
 
412
                frozenset([0]),
 
413
                frozenset([0]),
 
414
                frozenset([0,1,2]),
 
415
                ]
 
416
        k._weave = [('{', 0),
 
417
                'foo {',
 
418
                ('{', 1),
 
419
                '  added in version 1',
 
420
                ('{', 2),
 
421
                '  added in v2',
 
422
                ('}', 2),
 
423
                '  also from v1',
 
424
                ('}', 1),
 
425
                '}',
 
426
                ('}', 0)]
 
427
 
 
428
        k._sha1s = [sha_string('foo {}')
 
429
                  , sha_string('foo {  added in version 1  also from v1}')
 
430
                  , sha_string('foo {  added in v2}')
 
431
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
432
                  ]
 
433
 
 
434
        self.assertEqual(k.get(0),
 
435
                         ['foo {',
 
436
                          '}'])
 
437
 
 
438
        self.assertEqual(k.get(1),
 
439
                         ['foo {',
 
440
                          '  added in version 1',
 
441
                          '  also from v1',
 
442
                          '}'])
 
443
                       
 
444
        self.assertEqual(k.get(2),
 
445
                         ['foo {',
 
446
                          '  added in v2',
 
447
                          '}'])
 
448
 
 
449
        self.assertEqual(k.get(3),
 
450
                         ['foo {',
 
451
                          '  added in version 1',
 
452
                          '  added in v2',
 
453
                          '  also from v1',
 
454
                          '}'])
 
455
                         
 
456
 
 
457
class DeleteLines2(TestBase):
 
458
    """Test recording revisions that delete lines.
 
459
 
 
460
    This relies on the weave having a way to represent lines knocked
 
461
    out by a later revision."""
 
462
    def runTest(self):
 
463
        k = Weave()
 
464
 
 
465
        k.add('text0', [], ["line the first",
 
466
                   "line 2",
 
467
                   "line 3",
 
468
                   "fine"])
 
469
 
 
470
        self.assertEqual(len(k.get(0)), 4)
 
471
 
 
472
        k.add('text1', [0], ["line the first",
 
473
                   "fine"])
 
474
 
 
475
        self.assertEqual(k.get(1),
 
476
                         ["line the first",
 
477
                          "fine"])
 
478
 
 
479
        self.assertEqual(k.annotate(1),
 
480
                         [(0, "line the first"),
 
481
                          (0, "fine")])
 
482
 
 
483
 
 
484
class IncludeVersions(TestBase):
 
485
    """Check texts that are stored across multiple revisions.
 
486
 
 
487
    Here we manually create a weave with particular encoding and make
 
488
    sure it unpacks properly.
 
489
 
 
490
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
491
    lines.
 
492
    """
 
493
 
 
494
    def runTest(self):
 
495
        k = Weave()
 
496
 
 
497
        k._parents = [frozenset(), frozenset([0])]
 
498
        k._weave = [('{', 0),
 
499
                "first line",
 
500
                ('}', 0),
 
501
                ('{', 1),
 
502
                "second line",
 
503
                ('}', 1)]
 
504
 
 
505
        k._sha1s = [sha_string('first line')
 
506
                  , sha_string('first linesecond line')]
 
507
 
 
508
        self.assertEqual(k.get(1),
 
509
                         ["first line",
 
510
                          "second line"])
 
511
 
 
512
        self.assertEqual(k.get(0),
 
513
                         ["first line"])
 
514
 
 
515
 
 
516
class DivergedIncludes(TestBase):
 
517
    """Weave with two diverged texts based on version 0.
 
518
    """
 
519
    def runTest(self):
 
520
        k = Weave()
 
521
 
 
522
        k._parents = [frozenset(),
 
523
                frozenset([0]),
 
524
                frozenset([0]),
 
525
                ]
 
526
        k._weave = [('{', 0),
 
527
                "first line",
 
528
                ('}', 0),
 
529
                ('{', 1),
 
530
                "second line",
 
531
                ('}', 1),
 
532
                ('{', 2),
 
533
                "alternative second line",
 
534
                ('}', 2),                
 
535
                ]
 
536
 
 
537
        k._sha1s = [sha_string('first line')
 
538
                  , sha_string('first linesecond line')
 
539
                  , sha_string('first linealternative second line')]
 
540
 
 
541
        self.assertEqual(k.get(0),
 
542
                         ["first line"])
 
543
 
 
544
        self.assertEqual(k.get(1),
 
545
                         ["first line",
 
546
                          "second line"])
 
547
 
 
548
        self.assertEqual(k.get(2),
 
549
                         ["first line",
 
550
                          "alternative second line"])
 
551
 
 
552
        self.assertEqual(list(k.inclusions([2])),
 
553
                         [0, 2])
 
554
 
 
555
 
 
556
class ReplaceLine(TestBase):
 
557
    def runTest(self):
 
558
        k = Weave()
 
559
 
 
560
        text0 = ['cheddar', 'stilton', 'gruyere']
 
561
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
562
        
 
563
        k.add('text0', [], text0)
 
564
        k.add('text1', [0], text1)
 
565
 
 
566
        self.log('k._weave=' + pformat(k._weave))
 
567
 
 
568
        self.assertEqual(k.get(0), text0)
 
569
        self.assertEqual(k.get(1), text1)
 
570
 
 
571
 
 
572
class Merge(TestBase):
 
573
    """Storage of versions that merge diverged parents"""
 
574
    def runTest(self):
 
575
        k = Weave()
 
576
 
 
577
        texts = [['header'],
 
578
                 ['header', '', 'line from 1'],
 
579
                 ['header', '', 'line from 2', 'more from 2'],
 
580
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
581
                 ]
 
582
 
 
583
        k.add('text0', [], texts[0])
 
584
        k.add('text1', [0], texts[1])
 
585
        k.add('text2', [0], texts[2])
 
586
        k.add('merge', [0, 1, 2], texts[3])
 
587
 
 
588
        for i, t in enumerate(texts):
 
589
            self.assertEqual(k.get(i), t)
 
590
 
 
591
        self.assertEqual(k.annotate(3),
 
592
                         [(0, 'header'),
 
593
                          (1, ''),
 
594
                          (1, 'line from 1'),
 
595
                          (3, 'fixup line'),
 
596
                          (2, 'line from 2'),
 
597
                          ])
 
598
 
 
599
        self.assertEqual(list(k.inclusions([3])),
 
600
                         [0, 1, 2, 3])
 
601
 
 
602
        self.log('k._weave=' + pformat(k._weave))
 
603
 
 
604
        self.check_read_write(k)
 
605
 
 
606
 
 
607
class Conflicts(TestBase):
 
608
    """Test detection of conflicting regions during a merge.
 
609
 
 
610
    A base version is inserted, then two descendents try to
 
611
    insert different lines in the same place.  These should be
 
612
    reported as a possible conflict and forwarded to the user."""
 
613
    def runTest(self):
 
614
        return  # NOT RUN
 
615
        k = Weave()
 
616
 
 
617
        k.add([], ['aaa', 'bbb'])
 
618
        k.add([0], ['aaa', '111', 'bbb'])
 
619
        k.add([1], ['aaa', '222', 'bbb'])
 
620
 
 
621
        merged = k.merge([1, 2])
 
622
 
 
623
        self.assertEquals([[['aaa']],
 
624
                           [['111'], ['222']],
 
625
                           [['bbb']]])
 
626
 
 
627
 
 
628
class NonConflict(TestBase):
 
629
    """Two descendants insert compatible changes.
 
630
 
 
631
    No conflict should be reported."""
 
632
    def runTest(self):
 
633
        return  # NOT RUN
 
634
        k = Weave()
 
635
 
 
636
        k.add([], ['aaa', 'bbb'])
 
637
        k.add([0], ['111', 'aaa', 'ccc', 'bbb'])
 
638
        k.add([1], ['aaa', 'ccc', 'bbb', '222'])
 
639
 
 
640
 
 
641
class AutoMerge(TestBase):
 
642
    def runTest(self):
 
643
        k = Weave()
 
644
 
 
645
        texts = [['header', 'aaa', 'bbb'],
 
646
                 ['header', 'aaa', 'line from 1', 'bbb'],
 
647
                 ['header', 'aaa', 'bbb', 'line from 2', 'more from 2'],
 
648
                 ]
 
649
 
 
650
        k.add('text0', [], texts[0])
 
651
        k.add('text1', [0], texts[1])
 
652
        k.add('text2', [0], texts[2])
 
653
 
 
654
        self.log('k._weave=' + pformat(k._weave))
 
655
 
 
656
        m = list(k.mash_iter([0, 1, 2]))
 
657
 
 
658
        self.assertEqual(m,
 
659
                         ['header', 'aaa',
 
660
                          'line from 1',
 
661
                          'bbb',
 
662
                          'line from 2', 'more from 2'])
 
663
 
 
664
 
 
665
class Khayyam(TestBase):
 
666
    """Test changes to multi-line texts, and read/write"""
 
667
    def runTest(self):
 
668
        rawtexts = [
 
669
            """A Book of Verses underneath the Bough,
 
670
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
671
            Beside me singing in the Wilderness --
 
672
            Oh, Wilderness were Paradise enow!""",
 
673
            
 
674
            """A Book of Verses underneath the Bough,
 
675
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
676
            Beside me singing in the Wilderness --
 
677
            Oh, Wilderness were Paradise now!""",
 
678
 
 
679
            """A Book of poems underneath the tree,
 
680
            A Jug of Wine, a Loaf of Bread,
 
681
            and Thou
 
682
            Beside me singing in the Wilderness --
 
683
            Oh, Wilderness were Paradise now!
 
684
 
 
685
            -- O. Khayyam""",
 
686
 
 
687
            """A Book of Verses underneath the Bough,
 
688
            A Jug of Wine, a Loaf of Bread,
 
689
            and Thou
 
690
            Beside me singing in the Wilderness --
 
691
            Oh, Wilderness were Paradise now!""",
 
692
            ]
 
693
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
694
 
 
695
        k = Weave()
 
696
        parents = set()
 
697
        i = 0
 
698
        for t in texts:
 
699
            ver = k.add('text%d' % i,
 
700
                        list(parents), t)
 
701
            parents.add(ver)
 
702
            i += 1
 
703
 
 
704
        self.log("k._weave=" + pformat(k._weave))
 
705
 
 
706
        for i, t in enumerate(texts):
 
707
            self.assertEqual(k.get(i), t)
 
708
 
 
709
        self.check_read_write(k)
 
710
 
 
711
 
 
712
class MergeCases(TestBase):
 
713
    def doMerge(self, base, a, b, mp):
 
714
        from cStringIO import StringIO
 
715
        from textwrap import dedent
 
716
 
 
717
        def addcrlf(x):
 
718
            return x + '\n'
 
719
        
 
720
        w = Weave()
 
721
        w.add('text0', [], map(addcrlf, base))
 
722
        w.add('text1', [0], map(addcrlf, a))
 
723
        w.add('text2', [0], map(addcrlf, b))
 
724
 
 
725
        self.log('weave is:')
 
726
        tmpf = StringIO()
 
727
        write_weave(w, tmpf)
 
728
        self.log(tmpf.getvalue())
 
729
 
 
730
        self.log('merge plan:')
 
731
        p = list(w.plan_merge(1, 2))
 
732
        for state, line in p:
 
733
            if line:
 
734
                self.log('%12s | %s' % (state, line[:-1]))
 
735
 
 
736
        self.log('merge:')
 
737
        mt = StringIO()
 
738
        mt.writelines(w.weave_merge(p))
 
739
        mt.seek(0)
 
740
        self.log(mt.getvalue())
 
741
 
 
742
        mp = map(addcrlf, mp)
 
743
        self.assertEqual(mt.readlines(), mp)
 
744
        
 
745
        
 
746
    def testOneInsert(self):
 
747
        self.doMerge([],
 
748
                     ['aa'],
 
749
                     [],
 
750
                     ['aa'])
 
751
 
 
752
    def testSeparateInserts(self):
 
753
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
754
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
755
                     ['aaa', 'bbb', 'yyy', 'ccc'],
 
756
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
757
 
 
758
    def testSameInsert(self):
 
759
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
760
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
761
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
 
762
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
763
 
 
764
    def testOverlappedInsert(self):
 
765
        self.doMerge(['aaa', 'bbb'],
 
766
                     ['aaa', 'xxx', 'yyy', 'bbb'],
 
767
                     ['aaa', 'xxx', 'bbb'],
 
768
                     ['aaa', '<<<<<<<', 'xxx', 'yyy', '=======', 'xxx', 
 
769
                      '>>>>>>>', 'bbb'])
 
770
 
 
771
        # really it ought to reduce this to 
 
772
        # ['aaa', 'xxx', 'yyy', 'bbb']
 
773
 
 
774
 
 
775
    def testClashReplace(self):
 
776
        self.doMerge(['aaa'],
 
777
                     ['xxx'],
 
778
                     ['yyy', 'zzz'],
 
779
                     ['<<<<<<<', 'xxx', '=======', 'yyy', 'zzz', 
 
780
                      '>>>>>>>'])
 
781
 
 
782
    def testNonClashInsert(self):
 
783
        self.doMerge(['aaa'],
 
784
                     ['xxx', 'aaa'],
 
785
                     ['yyy', 'zzz'],
 
786
                     ['<<<<<<<', 'xxx', 'aaa', '=======', 'yyy', 'zzz', 
 
787
                      '>>>>>>>'])
 
788
 
 
789
        self.doMerge(['aaa'],
 
790
                     ['aaa'],
 
791
                     ['yyy', 'zzz'],
 
792
                     ['yyy', 'zzz'])
 
793
 
 
794
 
 
795
    def testDeleteAndModify(self):
 
796
        """Clashing delete and modification.
 
797
 
 
798
        If one side modifies a region and the other deletes it then
 
799
        there should be a conflict with one side blank.
 
800
        """
 
801
 
 
802
        #######################################
 
803
        # skippd, not working yet
 
804
        return
 
805
        
 
806
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
807
                     ['aaa', 'ddd', 'ccc'],
 
808
                     ['aaa', 'ccc'],
 
809
                     ['<<<<<<<<', 'aaa', '=======', '>>>>>>>', 'ccc'])
 
810
 
 
811
 
 
812
class JoinWeavesTests(TestBase):
 
813
    def setUp(self):
 
814
        super(JoinWeavesTests, self).setUp()
 
815
        self.weave1 = Weave()
 
816
        self.lines1 = ['hello\n']
 
817
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
818
        self.weave1.add('v1', [], self.lines1)
 
819
        self.weave1.add('v2', [0], ['hello\n', 'world\n'])
 
820
        self.weave1.add('v3', [1], self.lines3)
 
821
        
 
822
    def test_join_empty(self):
 
823
        """Join two empty weaves."""
 
824
        eq = self.assertEqual
 
825
        w1 = Weave()
 
826
        w2 = Weave()
 
827
        w1.join(w2)
 
828
        eq(w1.numversions(), 0)
 
829
        
 
830
    def test_join_empty_to_nonempty(self):
 
831
        """Join empty weave onto nonempty."""
 
832
        self.weave1.join(Weave())
 
833
        self.assertEqual(len(self.weave1), 3)
 
834
 
 
835
    def test_join_unrelated(self):
 
836
        """Join two weaves with no history in common."""
 
837
        wb = Weave()
 
838
        wb.add('b1', [], ['line from b\n'])
 
839
        w1 = self.weave1
 
840
        w1.join(wb)
 
841
        eq = self.assertEqual
 
842
        eq(len(w1), 4)
 
843
        eq(sorted(list(w1.iter_names())),
 
844
           ['b1', 'v1', 'v2', 'v3'])
 
845
 
 
846
    def test_join_related(self):
 
847
        wa = self.weave1.copy()
 
848
        wb = self.weave1.copy()
 
849
        wa.add('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
850
        wb.add('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
851
        eq = self.assertEquals
 
852
        eq(len(wa), 4)
 
853
        eq(len(wb), 4)
 
854
        wa.join(wb)
 
855
        eq(len(wa), 5)
 
856
        eq(wa.get_lines('b1'),
 
857
           ['hello\n', 'pale blue\n', 'world\n'])
 
858
 
 
859
    def test_join_parent_disagreement(self):
 
860
        """Cannot join weaves with different parents for a version."""
 
861
        wa = Weave()
 
862
        wb = Weave()
 
863
        wa.add('v1', [], ['hello\n'])
 
864
        wb.add('v0', [], [])
 
865
        wb.add('v1', ['v0'], ['hello\n'])
 
866
        self.assertRaises(WeaveError,
 
867
                          wa.join, wb)
 
868
 
 
869
    def test_join_text_disagreement(self):
 
870
        """Cannot join weaves with different texts for a version."""
 
871
        wa = Weave()
 
872
        wb = Weave()
 
873
        wa.add('v1', [], ['hello\n'])
 
874
        wb.add('v1', [], ['not\n', 'hello\n'])
 
875
        self.assertRaises(WeaveError,
 
876
                          wa.join, wb)
 
877
 
 
878
    def test_join_unordered(self):
 
879
        """Join weaves where indexes differ.
 
880
        
 
881
        The source weave contains a different version at index 0."""
 
882
        wa = self.weave1.copy()
 
883
        wb = Weave()
 
884
        wb.add('x1', [], ['line from x1\n'])
 
885
        wb.add('v1', [], ['hello\n'])
 
886
        wb.add('v2', ['v1'], ['hello\n', 'world\n'])
 
887
        wa.join(wb)
 
888
        eq = self.assertEquals
 
889
        eq(sorted(wa.iter_names()), ['v1', 'v2', 'v3', 'x1',])
 
890
        eq(wa.get_text('x1'), 'line from x1\n')
 
891
 
 
892
 
 
893
class Corruption(TestCase):
 
894
 
 
895
    def test_detection(self):
 
896
        # Test weaves detect corruption.
 
897
        #
 
898
        # Weaves contain a checksum of their texts.
 
899
        # When a text is extracted, this checksum should be
 
900
        # verified.
 
901
 
 
902
        w = Weave()
 
903
        w.add('v1', [], ['hello\n'])
 
904
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
 
905
 
 
906
        # We are going to invasively corrupt the text
 
907
        # Make sure the internals of weave are the same
 
908
        self.assertEqual([('{', 0)
 
909
                        , 'hello\n'
 
910
                        , ('}', None)
 
911
                        , ('{', 1)
 
912
                        , 'there\n'
 
913
                        , ('}', None)
 
914
                        ], w._weave)
 
915
 
 
916
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
917
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
918
                        ], w._sha1s)
 
919
        w.check()
 
920
 
 
921
        # Corrupted
 
922
        w._weave[4] = 'There\n'
 
923
 
 
924
        self.assertEqual('hello\n', w.get_text('v1'))
 
925
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
926
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
927
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
928
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
929
 
 
930
        # Corrected
 
931
        w._weave[4] = 'there\n'
 
932
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
933
 
 
934
        #Invalid checksum, first digit changed
 
935
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
936
 
 
937
        self.assertEqual('hello\n', w.get_text('v1'))
 
938
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
939
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
940
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
941
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
942
 
 
943
    def test_written_detection(self):
 
944
        # Test detection of weave file corruption.
 
945
        #
 
946
        # Make sure that we can detect if a weave file has
 
947
        # been corrupted. This doesn't test all forms of corruption,
 
948
        # but it at least helps verify the data you get, is what you want.
 
949
        from cStringIO import StringIO
 
950
 
 
951
        w = Weave()
 
952
        w.add('v1', [], ['hello\n'])
 
953
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
 
954
 
 
955
        tmpf = StringIO()
 
956
        write_weave(w, tmpf)
 
957
 
 
958
        # Because we are corrupting, we need to make sure we have the exact text
 
959
        self.assertEquals('# bzr weave file v5\n'
 
960
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
961
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
962
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
963
                          tmpf.getvalue())
 
964
 
 
965
        # Change a single letter
 
966
        tmpf = StringIO('# bzr weave file v5\n'
 
967
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
968
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
969
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
970
 
 
971
        w = read_weave(tmpf)
 
972
 
 
973
        self.assertEqual('hello\n', w.get_text('v1'))
 
974
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
975
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
976
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
977
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
978
 
 
979
        # Change the sha checksum
 
980
        tmpf = StringIO('# bzr weave file v5\n'
 
981
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
982
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
983
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
984
 
 
985
        w = read_weave(tmpf)
 
986
 
 
987
        self.assertEqual('hello\n', w.get_text('v1'))
 
988
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
989
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
990
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
 
991
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
992
 
 
993
 
 
994
class InstrumentedWeave(Weave):
 
995
    """Keep track of how many times functions are called."""
 
996
    
 
997
    def __init__(self, weave_name=None):
 
998
        self._extract_count = 0
 
999
        Weave.__init__(self, weave_name=weave_name)
 
1000
 
 
1001
    def _extract(self, versions):
 
1002
        self._extract_count += 1
 
1003
        return Weave._extract(self, versions)
 
1004
 
 
1005
 
 
1006
class JoinOptimization(TestCase):
 
1007
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
1008
 
 
1009
    def test_join(self):
 
1010
        w1 = InstrumentedWeave()
 
1011
        w2 = InstrumentedWeave()
 
1012
 
 
1013
        txt0 = ['a\n']
 
1014
        txt1 = ['a\n', 'b\n']
 
1015
        txt2 = ['a\n', 'c\n']
 
1016
        txt3 = ['a\n', 'b\n', 'c\n']
 
1017
 
 
1018
        w1.add('txt0', [], txt0) # extract 1a
 
1019
        w2.add('txt0', [], txt0) # extract 1b
 
1020
        w1.add('txt1', [0], txt1)# extract 2a
 
1021
        w2.add('txt2', [0], txt2)# extract 2b
 
1022
        w1.join(w2) # extract 3a to add txt2 
 
1023
        w2.join(w1) # extract 3b to add txt1 
 
1024
 
 
1025
        w1.add('txt3', [1, 2], txt3) # extract 4a 
 
1026
        w2.add('txt3', [1, 2], txt3) # extract 4b
 
1027
        # These secretly have inverted parents
 
1028
 
 
1029
        # This should not have to do any extractions
 
1030
        w1.join(w2) # NO extract, texts already present with same parents
 
1031
        w2.join(w1) # NO extract, texts already present with same parents
 
1032
 
 
1033
        self.assertEqual(4, w1._extract_count)
 
1034
        self.assertEqual(4, w2._extract_count)
 
1035
 
 
1036
    def test_double_parent(self):
 
1037
        # It should not be considered illegal to add
 
1038
        # a revision with the same parent twice
 
1039
        w1 = InstrumentedWeave()
 
1040
        w2 = InstrumentedWeave()
 
1041
 
 
1042
        txt0 = ['a\n']
 
1043
        txt1 = ['a\n', 'b\n']
 
1044
        txt2 = ['a\n', 'c\n']
 
1045
        txt3 = ['a\n', 'b\n', 'c\n']
 
1046
 
 
1047
        w1.add('txt0', [], txt0)
 
1048
        w2.add('txt0', [], txt0)
 
1049
        w1.add('txt1', [0], txt1)
 
1050
        w2.add('txt1', [0,0], txt1)
 
1051
        # Same text, effectively the same, because the
 
1052
        # parent is only repeated
 
1053
        w1.join(w2) # extract 3a to add txt2 
 
1054
        w2.join(w1) # extract 3b to add txt1 
 
1055
 
 
1056
 
 
1057
class MismatchedTexts(TestCase):
 
1058
    """Test that merging two weaves with different texts fails."""
 
1059
 
 
1060
    def test_reweave(self):
 
1061
        w1 = Weave('a')
 
1062
        w2 = Weave('b')
 
1063
 
 
1064
        w1.add('txt0', [], ['a\n'])
 
1065
        w2.add('txt0', [], ['a\n'])
 
1066
        w1.add('txt1', [0], ['a\n', 'b\n'])
 
1067
        w2.add('txt1', [0], ['a\n', 'c\n'])
 
1068
 
 
1069
        self.assertRaises(errors.WeaveTextDiffers, w1.reweave, w2)
 
1070
 
 
1071