~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Martin Pool
  • Date: 2005-06-22 06:37:43 UTC
  • Revision ID: mbp@sourcefrog.net-20050622063743-e395f04c4db8977f
- move old blackbox code from testbzr into bzrlib.selftest.blackbox

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#! /usr/bin/python2.4
2
 
 
3
 
# Copyright (C) 2005 by Canonical Ltd
4
 
 
5
 
# This program is free software; you can redistribute it and/or modify
6
 
# it under the terms of the GNU General Public License as published by
7
 
# the Free Software Foundation; either version 2 of the License, or
8
 
# (at your option) any later version.
9
 
 
10
 
# This program is distributed in the hope that it will be useful,
11
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
# GNU General Public License for more details.
14
 
 
15
 
# You should have received a copy of the GNU General Public License
16
 
# along with this program; if not, write to the Free Software
17
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
 
 
19
 
 
20
 
# TODO: tests regarding version names
21
 
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
22
 
#       if it fails.
23
 
 
24
 
"""test suite for weave algorithm"""
25
 
 
26
 
from pprint import pformat
27
 
 
28
 
import bzrlib.errors as errors
29
 
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
30
 
from bzrlib.weavefile import write_weave, read_weave
31
 
from bzrlib.tests import TestCase
32
 
from bzrlib.osutils import sha_string
33
 
 
34
 
 
35
 
# texts for use in testing
36
 
TEXT_0 = ["Hello world"]
37
 
TEXT_1 = ["Hello world",
38
 
          "A second line"]
39
 
 
40
 
 
41
 
class TestBase(TestCase):
42
 
    def check_read_write(self, k):
43
 
        """Check the weave k can be written & re-read."""
44
 
        from tempfile import TemporaryFile
45
 
        tf = TemporaryFile()
46
 
 
47
 
        write_weave(k, tf)
48
 
        tf.seek(0)
49
 
        k2 = read_weave(tf)
50
 
 
51
 
        if k != k2:
52
 
            tf.seek(0)
53
 
            self.log('serialized weave:')
54
 
            self.log(tf.read())
55
 
 
56
 
            self.log('')
57
 
            self.log('parents: %s' % (k._parents == k2._parents))
58
 
            self.log('         %r' % k._parents)
59
 
            self.log('         %r' % k2._parents)
60
 
            self.log('')
61
 
            self.fail('read/write check failed')
62
 
 
63
 
 
64
 
class WeaveContains(TestBase):
65
 
    """Weave __contains__ operator"""
66
 
    def runTest(self):
67
 
        k = Weave()
68
 
        self.assertFalse('foo' in k)
69
 
        k.add('foo', [], TEXT_1)
70
 
        self.assertTrue('foo' in k)
71
 
 
72
 
 
73
 
class Easy(TestBase):
74
 
    def runTest(self):
75
 
        k = Weave()
76
 
 
77
 
 
78
 
class StoreText(TestBase):
79
 
    """Store and retrieve a simple text."""
80
 
    def runTest(self):
81
 
        k = Weave()
82
 
        idx = k.add('text0', [], TEXT_0)
83
 
        self.assertEqual(k.get(idx), TEXT_0)
84
 
        self.assertEqual(idx, 0)
85
 
 
86
 
 
87
 
class AnnotateOne(TestBase):
88
 
    def runTest(self):
89
 
        k = Weave()
90
 
        k.add('text0', [], TEXT_0)
91
 
        self.assertEqual(k.annotate(0),
92
 
                         [(0, TEXT_0[0])])
93
 
 
94
 
 
95
 
class StoreTwo(TestBase):
96
 
    def runTest(self):
97
 
        k = Weave()
98
 
 
99
 
        idx = k.add('text0', [], TEXT_0)
100
 
        self.assertEqual(idx, 0)
101
 
 
102
 
        idx = k.add('text1', [], TEXT_1)
103
 
        self.assertEqual(idx, 1)
104
 
 
105
 
        self.assertEqual(k.get(0), TEXT_0)
106
 
        self.assertEqual(k.get(1), TEXT_1)
107
 
 
108
 
 
109
 
class AddWithGivenSha(TestBase):
110
 
    def runTest(self):
111
 
        """Add with caller-supplied SHA-1"""
112
 
        k = Weave()
113
 
 
114
 
        t = 'text0'
115
 
        k.add('text0', [], [t], sha1=sha_string(t))
116
 
 
117
 
 
118
 
class GetSha1(TestBase):
119
 
    def test_get_sha1(self):
120
 
        k = Weave()
121
 
        k.add('text0', [], 'text0')
122
 
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
123
 
                         k.get_sha1('text0'))
124
 
        self.assertRaises(errors.WeaveRevisionNotPresent,
125
 
                          k.get_sha1, 0)
126
 
        self.assertRaises(errors.WeaveRevisionNotPresent,
127
 
                          k.get_sha1, 'text1')
128
 
                        
129
 
 
130
 
class InvalidAdd(TestBase):
131
 
    """Try to use invalid version number during add."""
132
 
    def runTest(self):
133
 
        k = Weave()
134
 
 
135
 
        self.assertRaises(IndexError,
136
 
                          k.add,
137
 
                          'text0',
138
 
                          [69],
139
 
                          ['new text!'])
140
 
 
141
 
 
142
 
class RepeatedAdd(TestBase):
143
 
    """Add the same version twice; harmless."""
144
 
    def runTest(self):
145
 
        k = Weave()
146
 
        idx = k.add('text0', [], TEXT_0)
147
 
        idx2 = k.add('text0', [], TEXT_0)
148
 
        self.assertEqual(idx, idx2)
149
 
 
150
 
 
151
 
class InvalidRepeatedAdd(TestBase):
152
 
    def runTest(self):
153
 
        k = Weave()
154
 
        idx = k.add('text0', [], TEXT_0)
155
 
        self.assertRaises(WeaveError,
156
 
                          k.add,
157
 
                          'text0',
158
 
                          [],
159
 
                          ['not the same text'])
160
 
        self.assertRaises(WeaveError,
161
 
                          k.add,
162
 
                          'text0',
163
 
                          [12],         # not the right parents
164
 
                          TEXT_0)
165
 
        
166
 
 
167
 
class InsertLines(TestBase):
168
 
    """Store a revision that adds one line to the original.
169
 
 
170
 
    Look at the annotations to make sure that the first line is matched
171
 
    and not stored repeatedly."""
172
 
    def runTest(self):
173
 
        k = Weave()
174
 
 
175
 
        k.add('text0', [], ['line 1'])
176
 
        k.add('text1', [0], ['line 1', 'line 2'])
177
 
 
178
 
        self.assertEqual(k.annotate(0),
179
 
                         [(0, 'line 1')])
180
 
 
181
 
        self.assertEqual(k.get(1),
182
 
                         ['line 1',
183
 
                          'line 2'])
184
 
 
185
 
        self.assertEqual(k.annotate(1),
186
 
                         [(0, 'line 1'),
187
 
                          (1, 'line 2')])
188
 
 
189
 
        k.add('text2', [0], ['line 1', 'diverged line'])
190
 
 
191
 
        self.assertEqual(k.annotate(2),
192
 
                         [(0, 'line 1'),
193
 
                          (2, 'diverged line')])
194
 
 
195
 
        text3 = ['line 1', 'middle line', 'line 2']
196
 
        k.add('text3',
197
 
              [0, 1],
198
 
              text3)
199
 
 
200
 
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
201
 
 
202
 
        self.log("k._weave=" + pformat(k._weave))
203
 
 
204
 
        self.assertEqual(k.annotate(3),
205
 
                         [(0, 'line 1'),
206
 
                          (3, 'middle line'),
207
 
                          (1, 'line 2')])
208
 
 
209
 
        # now multiple insertions at different places
210
 
        k.add('text4',
211
 
              [0, 1, 3],
212
 
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
213
 
 
214
 
        self.assertEqual(k.annotate(4), 
215
 
                         [(0, 'line 1'),
216
 
                          (4, 'aaa'),
217
 
                          (3, 'middle line'),
218
 
                          (4, 'bbb'),
219
 
                          (1, 'line 2'),
220
 
                          (4, 'ccc')])
221
 
 
222
 
 
223
 
class DeleteLines(TestBase):
224
 
    """Deletion of lines from existing text.
225
 
 
226
 
    Try various texts all based on a common ancestor."""
227
 
    def runTest(self):
228
 
        k = Weave()
229
 
 
230
 
        base_text = ['one', 'two', 'three', 'four']
231
 
 
232
 
        k.add('text0', [], base_text)
233
 
        
234
 
        texts = [['one', 'two', 'three'],
235
 
                 ['two', 'three', 'four'],
236
 
                 ['one', 'four'],
237
 
                 ['one', 'two', 'three', 'four'],
238
 
                 ]
239
 
 
240
 
        i = 1
241
 
        for t in texts:
242
 
            ver = k.add('text%d' % i,
243
 
                        [0], t)
244
 
            i += 1
245
 
 
246
 
        self.log('final weave:')
247
 
        self.log('k._weave=' + pformat(k._weave))
248
 
 
249
 
        for i in range(len(texts)):
250
 
            self.assertEqual(k.get(i+1),
251
 
                             texts[i])
252
 
 
253
 
 
254
 
class SuicideDelete(TestBase):
255
 
    """Invalid weave which tries to add and delete simultaneously."""
256
 
    def runTest(self):
257
 
        k = Weave()
258
 
 
259
 
        k._parents = [(),
260
 
                ]
261
 
        k._weave = [('{', 0),
262
 
                'first line',
263
 
                ('[', 0),
264
 
                'deleted in 0',
265
 
                (']', 0),
266
 
                ('}', 0),
267
 
                ]
268
 
        ################################### SKIPPED
269
 
        # Weave.get doesn't trap this anymore
270
 
        return 
271
 
 
272
 
        self.assertRaises(WeaveFormatError,
273
 
                          k.get,
274
 
                          0)        
275
 
 
276
 
 
277
 
class CannedDelete(TestBase):
278
 
    """Unpack canned weave with deleted lines."""
279
 
    def runTest(self):
280
 
        k = Weave()
281
 
 
282
 
        k._parents = [(),
283
 
                frozenset([0]),
284
 
                ]
285
 
        k._weave = [('{', 0),
286
 
                'first line',
287
 
                ('[', 1),
288
 
                'line to be deleted',
289
 
                (']', 1),
290
 
                'last line',
291
 
                ('}', 0),
292
 
                ]
293
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
294
 
                  , sha_string('first linelast line')]
295
 
 
296
 
        self.assertEqual(k.get(0),
297
 
                         ['first line',
298
 
                          'line to be deleted',
299
 
                          'last line',
300
 
                          ])
301
 
 
302
 
        self.assertEqual(k.get(1),
303
 
                         ['first line',
304
 
                          'last line',
305
 
                          ])
306
 
 
307
 
 
308
 
class CannedReplacement(TestBase):
309
 
    """Unpack canned weave with deleted lines."""
310
 
    def runTest(self):
311
 
        k = Weave()
312
 
 
313
 
        k._parents = [frozenset(),
314
 
                frozenset([0]),
315
 
                ]
316
 
        k._weave = [('{', 0),
317
 
                'first line',
318
 
                ('[', 1),
319
 
                'line to be deleted',
320
 
                (']', 1),
321
 
                ('{', 1),
322
 
                'replacement line',                
323
 
                ('}', 1),
324
 
                'last line',
325
 
                ('}', 0),
326
 
                ]
327
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
328
 
                  , sha_string('first linereplacement linelast line')]
329
 
 
330
 
        self.assertEqual(k.get(0),
331
 
                         ['first line',
332
 
                          'line to be deleted',
333
 
                          'last line',
334
 
                          ])
335
 
 
336
 
        self.assertEqual(k.get(1),
337
 
                         ['first line',
338
 
                          'replacement line',
339
 
                          'last line',
340
 
                          ])
341
 
 
342
 
 
343
 
class BadWeave(TestBase):
344
 
    """Test that we trap an insert which should not occur."""
345
 
    def runTest(self):
346
 
        k = Weave()
347
 
 
348
 
        k._parents = [frozenset(),
349
 
                ]
350
 
        k._weave = ['bad line',
351
 
                ('{', 0),
352
 
                'foo {',
353
 
                ('{', 1),
354
 
                '  added in version 1',
355
 
                ('{', 2),
356
 
                '  added in v2',
357
 
                ('}', 2),
358
 
                '  also from v1',
359
 
                ('}', 1),
360
 
                '}',
361
 
                ('}', 0)]
362
 
 
363
 
        ################################### SKIPPED
364
 
        # Weave.get doesn't trap this anymore
365
 
        return 
366
 
 
367
 
 
368
 
        self.assertRaises(WeaveFormatError,
369
 
                          k.get,
370
 
                          0)
371
 
 
372
 
 
373
 
class BadInsert(TestBase):
374
 
    """Test that we trap an insert which should not occur."""
375
 
    def runTest(self):
376
 
        k = Weave()
377
 
 
378
 
        k._parents = [frozenset(),
379
 
                frozenset([0]),
380
 
                frozenset([0]),
381
 
                frozenset([0,1,2]),
382
 
                ]
383
 
        k._weave = [('{', 0),
384
 
                'foo {',
385
 
                ('{', 1),
386
 
                '  added in version 1',
387
 
                ('{', 1),
388
 
                '  more in 1',
389
 
                ('}', 1),
390
 
                ('}', 1),
391
 
                ('}', 0)]
392
 
 
393
 
 
394
 
        # this is not currently enforced by get
395
 
        return  ##########################################
396
 
 
397
 
        self.assertRaises(WeaveFormatError,
398
 
                          k.get,
399
 
                          0)
400
 
 
401
 
        self.assertRaises(WeaveFormatError,
402
 
                          k.get,
403
 
                          1)
404
 
 
405
 
 
406
 
class InsertNested(TestBase):
407
 
    """Insertion with nested instructions."""
408
 
    def runTest(self):
409
 
        k = Weave()
410
 
 
411
 
        k._parents = [frozenset(),
412
 
                frozenset([0]),
413
 
                frozenset([0]),
414
 
                frozenset([0,1,2]),
415
 
                ]
416
 
        k._weave = [('{', 0),
417
 
                'foo {',
418
 
                ('{', 1),
419
 
                '  added in version 1',
420
 
                ('{', 2),
421
 
                '  added in v2',
422
 
                ('}', 2),
423
 
                '  also from v1',
424
 
                ('}', 1),
425
 
                '}',
426
 
                ('}', 0)]
427
 
 
428
 
        k._sha1s = [sha_string('foo {}')
429
 
                  , sha_string('foo {  added in version 1  also from v1}')
430
 
                  , sha_string('foo {  added in v2}')
431
 
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
432
 
                  ]
433
 
 
434
 
        self.assertEqual(k.get(0),
435
 
                         ['foo {',
436
 
                          '}'])
437
 
 
438
 
        self.assertEqual(k.get(1),
439
 
                         ['foo {',
440
 
                          '  added in version 1',
441
 
                          '  also from v1',
442
 
                          '}'])
443
 
                       
444
 
        self.assertEqual(k.get(2),
445
 
                         ['foo {',
446
 
                          '  added in v2',
447
 
                          '}'])
448
 
 
449
 
        self.assertEqual(k.get(3),
450
 
                         ['foo {',
451
 
                          '  added in version 1',
452
 
                          '  added in v2',
453
 
                          '  also from v1',
454
 
                          '}'])
455
 
                         
456
 
 
457
 
class DeleteLines2(TestBase):
458
 
    """Test recording revisions that delete lines.
459
 
 
460
 
    This relies on the weave having a way to represent lines knocked
461
 
    out by a later revision."""
462
 
    def runTest(self):
463
 
        k = Weave()
464
 
 
465
 
        k.add('text0', [], ["line the first",
466
 
                   "line 2",
467
 
                   "line 3",
468
 
                   "fine"])
469
 
 
470
 
        self.assertEqual(len(k.get(0)), 4)
471
 
 
472
 
        k.add('text1', [0], ["line the first",
473
 
                   "fine"])
474
 
 
475
 
        self.assertEqual(k.get(1),
476
 
                         ["line the first",
477
 
                          "fine"])
478
 
 
479
 
        self.assertEqual(k.annotate(1),
480
 
                         [(0, "line the first"),
481
 
                          (0, "fine")])
482
 
 
483
 
 
484
 
class IncludeVersions(TestBase):
485
 
    """Check texts that are stored across multiple revisions.
486
 
 
487
 
    Here we manually create a weave with particular encoding and make
488
 
    sure it unpacks properly.
489
 
 
490
 
    Text 0 includes nothing; text 1 includes text 0 and adds some
491
 
    lines.
492
 
    """
493
 
 
494
 
    def runTest(self):
495
 
        k = Weave()
496
 
 
497
 
        k._parents = [frozenset(), frozenset([0])]
498
 
        k._weave = [('{', 0),
499
 
                "first line",
500
 
                ('}', 0),
501
 
                ('{', 1),
502
 
                "second line",
503
 
                ('}', 1)]
504
 
 
505
 
        k._sha1s = [sha_string('first line')
506
 
                  , sha_string('first linesecond line')]
507
 
 
508
 
        self.assertEqual(k.get(1),
509
 
                         ["first line",
510
 
                          "second line"])
511
 
 
512
 
        self.assertEqual(k.get(0),
513
 
                         ["first line"])
514
 
 
515
 
 
516
 
class DivergedIncludes(TestBase):
517
 
    """Weave with two diverged texts based on version 0.
518
 
    """
519
 
    def runTest(self):
520
 
        k = Weave()
521
 
 
522
 
        k._parents = [frozenset(),
523
 
                frozenset([0]),
524
 
                frozenset([0]),
525
 
                ]
526
 
        k._weave = [('{', 0),
527
 
                "first line",
528
 
                ('}', 0),
529
 
                ('{', 1),
530
 
                "second line",
531
 
                ('}', 1),
532
 
                ('{', 2),
533
 
                "alternative second line",
534
 
                ('}', 2),                
535
 
                ]
536
 
 
537
 
        k._sha1s = [sha_string('first line')
538
 
                  , sha_string('first linesecond line')
539
 
                  , sha_string('first linealternative second line')]
540
 
 
541
 
        self.assertEqual(k.get(0),
542
 
                         ["first line"])
543
 
 
544
 
        self.assertEqual(k.get(1),
545
 
                         ["first line",
546
 
                          "second line"])
547
 
 
548
 
        self.assertEqual(k.get(2),
549
 
                         ["first line",
550
 
                          "alternative second line"])
551
 
 
552
 
        self.assertEqual(list(k.inclusions([2])),
553
 
                         [0, 2])
554
 
 
555
 
 
556
 
class ReplaceLine(TestBase):
557
 
    def runTest(self):
558
 
        k = Weave()
559
 
 
560
 
        text0 = ['cheddar', 'stilton', 'gruyere']
561
 
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
562
 
        
563
 
        k.add('text0', [], text0)
564
 
        k.add('text1', [0], text1)
565
 
 
566
 
        self.log('k._weave=' + pformat(k._weave))
567
 
 
568
 
        self.assertEqual(k.get(0), text0)
569
 
        self.assertEqual(k.get(1), text1)
570
 
 
571
 
 
572
 
class Merge(TestBase):
573
 
    """Storage of versions that merge diverged parents"""
574
 
    def runTest(self):
575
 
        k = Weave()
576
 
 
577
 
        texts = [['header'],
578
 
                 ['header', '', 'line from 1'],
579
 
                 ['header', '', 'line from 2', 'more from 2'],
580
 
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
581
 
                 ]
582
 
 
583
 
        k.add('text0', [], texts[0])
584
 
        k.add('text1', [0], texts[1])
585
 
        k.add('text2', [0], texts[2])
586
 
        k.add('merge', [0, 1, 2], texts[3])
587
 
 
588
 
        for i, t in enumerate(texts):
589
 
            self.assertEqual(k.get(i), t)
590
 
 
591
 
        self.assertEqual(k.annotate(3),
592
 
                         [(0, 'header'),
593
 
                          (1, ''),
594
 
                          (1, 'line from 1'),
595
 
                          (3, 'fixup line'),
596
 
                          (2, 'line from 2'),
597
 
                          ])
598
 
 
599
 
        self.assertEqual(list(k.inclusions([3])),
600
 
                         [0, 1, 2, 3])
601
 
 
602
 
        self.log('k._weave=' + pformat(k._weave))
603
 
 
604
 
        self.check_read_write(k)
605
 
 
606
 
 
607
 
class Conflicts(TestBase):
608
 
    """Test detection of conflicting regions during a merge.
609
 
 
610
 
    A base version is inserted, then two descendents try to
611
 
    insert different lines in the same place.  These should be
612
 
    reported as a possible conflict and forwarded to the user."""
613
 
    def runTest(self):
614
 
        return  # NOT RUN
615
 
        k = Weave()
616
 
 
617
 
        k.add([], ['aaa', 'bbb'])
618
 
        k.add([0], ['aaa', '111', 'bbb'])
619
 
        k.add([1], ['aaa', '222', 'bbb'])
620
 
 
621
 
        merged = k.merge([1, 2])
622
 
 
623
 
        self.assertEquals([[['aaa']],
624
 
                           [['111'], ['222']],
625
 
                           [['bbb']]])
626
 
 
627
 
 
628
 
class NonConflict(TestBase):
629
 
    """Two descendants insert compatible changes.
630
 
 
631
 
    No conflict should be reported."""
632
 
    def runTest(self):
633
 
        return  # NOT RUN
634
 
        k = Weave()
635
 
 
636
 
        k.add([], ['aaa', 'bbb'])
637
 
        k.add([0], ['111', 'aaa', 'ccc', 'bbb'])
638
 
        k.add([1], ['aaa', 'ccc', 'bbb', '222'])
639
 
 
640
 
 
641
 
class AutoMerge(TestBase):
642
 
    def runTest(self):
643
 
        k = Weave()
644
 
 
645
 
        texts = [['header', 'aaa', 'bbb'],
646
 
                 ['header', 'aaa', 'line from 1', 'bbb'],
647
 
                 ['header', 'aaa', 'bbb', 'line from 2', 'more from 2'],
648
 
                 ]
649
 
 
650
 
        k.add('text0', [], texts[0])
651
 
        k.add('text1', [0], texts[1])
652
 
        k.add('text2', [0], texts[2])
653
 
 
654
 
        self.log('k._weave=' + pformat(k._weave))
655
 
 
656
 
        m = list(k.mash_iter([0, 1, 2]))
657
 
 
658
 
        self.assertEqual(m,
659
 
                         ['header', 'aaa',
660
 
                          'line from 1',
661
 
                          'bbb',
662
 
                          'line from 2', 'more from 2'])
663
 
 
664
 
 
665
 
class Khayyam(TestBase):
666
 
    """Test changes to multi-line texts, and read/write"""
667
 
    def runTest(self):
668
 
        rawtexts = [
669
 
            """A Book of Verses underneath the Bough,
670
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
671
 
            Beside me singing in the Wilderness --
672
 
            Oh, Wilderness were Paradise enow!""",
673
 
            
674
 
            """A Book of Verses underneath the Bough,
675
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
676
 
            Beside me singing in the Wilderness --
677
 
            Oh, Wilderness were Paradise now!""",
678
 
 
679
 
            """A Book of poems underneath the tree,
680
 
            A Jug of Wine, a Loaf of Bread,
681
 
            and Thou
682
 
            Beside me singing in the Wilderness --
683
 
            Oh, Wilderness were Paradise now!
684
 
 
685
 
            -- O. Khayyam""",
686
 
 
687
 
            """A Book of Verses underneath the Bough,
688
 
            A Jug of Wine, a Loaf of Bread,
689
 
            and Thou
690
 
            Beside me singing in the Wilderness --
691
 
            Oh, Wilderness were Paradise now!""",
692
 
            ]
693
 
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
694
 
 
695
 
        k = Weave()
696
 
        parents = set()
697
 
        i = 0
698
 
        for t in texts:
699
 
            ver = k.add('text%d' % i,
700
 
                        list(parents), t)
701
 
            parents.add(ver)
702
 
            i += 1
703
 
 
704
 
        self.log("k._weave=" + pformat(k._weave))
705
 
 
706
 
        for i, t in enumerate(texts):
707
 
            self.assertEqual(k.get(i), t)
708
 
 
709
 
        self.check_read_write(k)
710
 
 
711
 
 
712
 
class MergeCases(TestBase):
713
 
    def doMerge(self, base, a, b, mp):
714
 
        from cStringIO import StringIO
715
 
        from textwrap import dedent
716
 
 
717
 
        def addcrlf(x):
718
 
            return x + '\n'
719
 
        
720
 
        w = Weave()
721
 
        w.add('text0', [], map(addcrlf, base))
722
 
        w.add('text1', [0], map(addcrlf, a))
723
 
        w.add('text2', [0], map(addcrlf, b))
724
 
 
725
 
        self.log('weave is:')
726
 
        tmpf = StringIO()
727
 
        write_weave(w, tmpf)
728
 
        self.log(tmpf.getvalue())
729
 
 
730
 
        self.log('merge plan:')
731
 
        p = list(w.plan_merge(1, 2))
732
 
        for state, line in p:
733
 
            if line:
734
 
                self.log('%12s | %s' % (state, line[:-1]))
735
 
 
736
 
        self.log('merge:')
737
 
        mt = StringIO()
738
 
        mt.writelines(w.weave_merge(p))
739
 
        mt.seek(0)
740
 
        self.log(mt.getvalue())
741
 
 
742
 
        mp = map(addcrlf, mp)
743
 
        self.assertEqual(mt.readlines(), mp)
744
 
        
745
 
        
746
 
    def testOneInsert(self):
747
 
        self.doMerge([],
748
 
                     ['aa'],
749
 
                     [],
750
 
                     ['aa'])
751
 
 
752
 
    def testSeparateInserts(self):
753
 
        self.doMerge(['aaa', 'bbb', 'ccc'],
754
 
                     ['aaa', 'xxx', 'bbb', 'ccc'],
755
 
                     ['aaa', 'bbb', 'yyy', 'ccc'],
756
 
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
757
 
 
758
 
    def testSameInsert(self):
759
 
        self.doMerge(['aaa', 'bbb', 'ccc'],
760
 
                     ['aaa', 'xxx', 'bbb', 'ccc'],
761
 
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
762
 
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
763
 
 
764
 
    def testOverlappedInsert(self):
765
 
        self.doMerge(['aaa', 'bbb'],
766
 
                     ['aaa', 'xxx', 'yyy', 'bbb'],
767
 
                     ['aaa', 'xxx', 'bbb'],
768
 
                     ['aaa', '<<<<<<<', 'xxx', 'yyy', '=======', 'xxx', 
769
 
                      '>>>>>>>', 'bbb'])
770
 
 
771
 
        # really it ought to reduce this to 
772
 
        # ['aaa', 'xxx', 'yyy', 'bbb']
773
 
 
774
 
 
775
 
    def testClashReplace(self):
776
 
        self.doMerge(['aaa'],
777
 
                     ['xxx'],
778
 
                     ['yyy', 'zzz'],
779
 
                     ['<<<<<<<', 'xxx', '=======', 'yyy', 'zzz', 
780
 
                      '>>>>>>>'])
781
 
 
782
 
    def testNonClashInsert(self):
783
 
        self.doMerge(['aaa'],
784
 
                     ['xxx', 'aaa'],
785
 
                     ['yyy', 'zzz'],
786
 
                     ['<<<<<<<', 'xxx', 'aaa', '=======', 'yyy', 'zzz', 
787
 
                      '>>>>>>>'])
788
 
 
789
 
        self.doMerge(['aaa'],
790
 
                     ['aaa'],
791
 
                     ['yyy', 'zzz'],
792
 
                     ['yyy', 'zzz'])
793
 
 
794
 
 
795
 
    def testDeleteAndModify(self):
796
 
        """Clashing delete and modification.
797
 
 
798
 
        If one side modifies a region and the other deletes it then
799
 
        there should be a conflict with one side blank.
800
 
        """
801
 
 
802
 
        #######################################
803
 
        # skippd, not working yet
804
 
        return
805
 
        
806
 
        self.doMerge(['aaa', 'bbb', 'ccc'],
807
 
                     ['aaa', 'ddd', 'ccc'],
808
 
                     ['aaa', 'ccc'],
809
 
                     ['<<<<<<<<', 'aaa', '=======', '>>>>>>>', 'ccc'])
810
 
 
811
 
 
812
 
class JoinWeavesTests(TestBase):
813
 
    def setUp(self):
814
 
        super(JoinWeavesTests, self).setUp()
815
 
        self.weave1 = Weave()
816
 
        self.lines1 = ['hello\n']
817
 
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
818
 
        self.weave1.add('v1', [], self.lines1)
819
 
        self.weave1.add('v2', [0], ['hello\n', 'world\n'])
820
 
        self.weave1.add('v3', [1], self.lines3)
821
 
        
822
 
    def test_join_empty(self):
823
 
        """Join two empty weaves."""
824
 
        eq = self.assertEqual
825
 
        w1 = Weave()
826
 
        w2 = Weave()
827
 
        w1.join(w2)
828
 
        eq(w1.numversions(), 0)
829
 
        
830
 
    def test_join_empty_to_nonempty(self):
831
 
        """Join empty weave onto nonempty."""
832
 
        self.weave1.join(Weave())
833
 
        self.assertEqual(len(self.weave1), 3)
834
 
 
835
 
    def test_join_unrelated(self):
836
 
        """Join two weaves with no history in common."""
837
 
        wb = Weave()
838
 
        wb.add('b1', [], ['line from b\n'])
839
 
        w1 = self.weave1
840
 
        w1.join(wb)
841
 
        eq = self.assertEqual
842
 
        eq(len(w1), 4)
843
 
        eq(sorted(list(w1.iter_names())),
844
 
           ['b1', 'v1', 'v2', 'v3'])
845
 
 
846
 
    def test_join_related(self):
847
 
        wa = self.weave1.copy()
848
 
        wb = self.weave1.copy()
849
 
        wa.add('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
850
 
        wb.add('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
851
 
        eq = self.assertEquals
852
 
        eq(len(wa), 4)
853
 
        eq(len(wb), 4)
854
 
        wa.join(wb)
855
 
        eq(len(wa), 5)
856
 
        eq(wa.get_lines('b1'),
857
 
           ['hello\n', 'pale blue\n', 'world\n'])
858
 
 
859
 
    def test_join_parent_disagreement(self):
860
 
        """Cannot join weaves with different parents for a version."""
861
 
        wa = Weave()
862
 
        wb = Weave()
863
 
        wa.add('v1', [], ['hello\n'])
864
 
        wb.add('v0', [], [])
865
 
        wb.add('v1', ['v0'], ['hello\n'])
866
 
        self.assertRaises(WeaveError,
867
 
                          wa.join, wb)
868
 
 
869
 
    def test_join_text_disagreement(self):
870
 
        """Cannot join weaves with different texts for a version."""
871
 
        wa = Weave()
872
 
        wb = Weave()
873
 
        wa.add('v1', [], ['hello\n'])
874
 
        wb.add('v1', [], ['not\n', 'hello\n'])
875
 
        self.assertRaises(WeaveError,
876
 
                          wa.join, wb)
877
 
 
878
 
    def test_join_unordered(self):
879
 
        """Join weaves where indexes differ.
880
 
        
881
 
        The source weave contains a different version at index 0."""
882
 
        wa = self.weave1.copy()
883
 
        wb = Weave()
884
 
        wb.add('x1', [], ['line from x1\n'])
885
 
        wb.add('v1', [], ['hello\n'])
886
 
        wb.add('v2', ['v1'], ['hello\n', 'world\n'])
887
 
        wa.join(wb)
888
 
        eq = self.assertEquals
889
 
        eq(sorted(wa.iter_names()), ['v1', 'v2', 'v3', 'x1',])
890
 
        eq(wa.get_text('x1'), 'line from x1\n')
891
 
 
892
 
 
893
 
class Corruption(TestCase):
894
 
 
895
 
    def test_detection(self):
896
 
        # Test weaves detect corruption.
897
 
        #
898
 
        # Weaves contain a checksum of their texts.
899
 
        # When a text is extracted, this checksum should be
900
 
        # verified.
901
 
 
902
 
        w = Weave()
903
 
        w.add('v1', [], ['hello\n'])
904
 
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
905
 
 
906
 
        # We are going to invasively corrupt the text
907
 
        # Make sure the internals of weave are the same
908
 
        self.assertEqual([('{', 0)
909
 
                        , 'hello\n'
910
 
                        , ('}', None)
911
 
                        , ('{', 1)
912
 
                        , 'there\n'
913
 
                        , ('}', None)
914
 
                        ], w._weave)
915
 
 
916
 
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
917
 
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
918
 
                        ], w._sha1s)
919
 
        w.check()
920
 
 
921
 
        # Corrupted
922
 
        w._weave[4] = 'There\n'
923
 
 
924
 
        self.assertEqual('hello\n', w.get_text('v1'))
925
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
926
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
927
 
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
928
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
929
 
 
930
 
        # Corrected
931
 
        w._weave[4] = 'there\n'
932
 
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
933
 
 
934
 
        #Invalid checksum, first digit changed
935
 
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
936
 
 
937
 
        self.assertEqual('hello\n', w.get_text('v1'))
938
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
939
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
940
 
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
941
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
942
 
 
943
 
    def test_written_detection(self):
944
 
        # Test detection of weave file corruption.
945
 
        #
946
 
        # Make sure that we can detect if a weave file has
947
 
        # been corrupted. This doesn't test all forms of corruption,
948
 
        # but it at least helps verify the data you get, is what you want.
949
 
        from cStringIO import StringIO
950
 
 
951
 
        w = Weave()
952
 
        w.add('v1', [], ['hello\n'])
953
 
        w.add('v2', ['v1'], ['hello\n', 'there\n'])
954
 
 
955
 
        tmpf = StringIO()
956
 
        write_weave(w, tmpf)
957
 
 
958
 
        # Because we are corrupting, we need to make sure we have the exact text
959
 
        self.assertEquals('# bzr weave file v5\n'
960
 
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
961
 
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
962
 
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
963
 
                          tmpf.getvalue())
964
 
 
965
 
        # Change a single letter
966
 
        tmpf = StringIO('# bzr weave file v5\n'
967
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
968
 
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
969
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
970
 
 
971
 
        w = read_weave(tmpf)
972
 
 
973
 
        self.assertEqual('hello\n', w.get_text('v1'))
974
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
975
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
976
 
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
977
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
978
 
 
979
 
        # Change the sha checksum
980
 
        tmpf = StringIO('# bzr weave file v5\n'
981
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
982
 
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
983
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
984
 
 
985
 
        w = read_weave(tmpf)
986
 
 
987
 
        self.assertEqual('hello\n', w.get_text('v1'))
988
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
989
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
990
 
        self.assertRaises(errors.WeaveInvalidChecksum, list, w.get_iter('v2'))
991
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
992
 
 
993