~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: John Arbash Meinel
  • Date: 2006-03-08 14:31:23 UTC
  • mfrom: (1598 +trunk)
  • mto: (1685.1.1 bzr-encoding)
  • mto: This revision was merged to the branch mainline in revision 1752.
  • Revision ID: john@arbash-meinel.com-20060308143123-448308b0db4de410
[merge] bzr.dev 1573, lots of updates

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/python2.4
 
2
 
 
3
# Copyright (C) 2005 by Canonical Ltd
 
4
 
 
5
# This program is free software; you can redistribute it and/or modify
 
6
# it under the terms of the GNU General Public License as published by
 
7
# the Free Software Foundation; either version 2 of the License, or
 
8
# (at your option) any later version.
 
9
 
 
10
# This program is distributed in the hope that it will be useful,
 
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
# GNU General Public License for more details.
 
14
 
 
15
# You should have received a copy of the GNU General Public License
 
16
# along with this program; if not, write to the Free Software
 
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
 
 
19
 
 
20
# TODO: tests regarding version names
 
21
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
22
#       if it fails.
 
23
 
 
24
"""test suite for weave algorithm"""
 
25
 
 
26
from pprint import pformat
 
27
 
 
28
import bzrlib.errors as errors
 
29
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
 
30
from bzrlib.weavefile import write_weave, read_weave
 
31
from bzrlib.tests import TestCase
 
32
from bzrlib.osutils import sha_string
 
33
 
 
34
 
 
35
# texts for use in testing
 
36
TEXT_0 = ["Hello world"]
 
37
TEXT_1 = ["Hello world",
 
38
          "A second line"]
 
39
 
 
40
 
 
41
class TestBase(TestCase):
 
42
    def check_read_write(self, k):
 
43
        """Check the weave k can be written & re-read."""
 
44
        from tempfile import TemporaryFile
 
45
        tf = TemporaryFile()
 
46
 
 
47
        write_weave(k, tf)
 
48
        tf.seek(0)
 
49
        k2 = read_weave(tf)
 
50
 
 
51
        if k != k2:
 
52
            tf.seek(0)
 
53
            self.log('serialized weave:')
 
54
            self.log(tf.read())
 
55
 
 
56
            self.log('')
 
57
            self.log('parents: %s' % (k._parents == k2._parents))
 
58
            self.log('         %r' % k._parents)
 
59
            self.log('         %r' % k2._parents)
 
60
            self.log('')
 
61
            self.fail('read/write check failed')
 
62
 
 
63
 
 
64
class WeaveContains(TestBase):
 
65
    """Weave __contains__ operator"""
 
66
    def runTest(self):
 
67
        k = Weave()
 
68
        self.assertFalse('foo' in k)
 
69
        k.add_lines('foo', [], TEXT_1)
 
70
        self.assertTrue('foo' in k)
 
71
 
 
72
 
 
73
class Easy(TestBase):
 
74
    def runTest(self):
 
75
        k = Weave()
 
76
 
 
77
 
 
78
class StoreText(TestBase):
 
79
    """Store and retrieve a simple text."""
 
80
    def runTest(self):
 
81
        k = Weave()
 
82
        idx = k.add_lines('text0', [], TEXT_0)
 
83
        self.assertEqual(k.get_lines(idx), TEXT_0)
 
84
        self.assertEqual(idx, 0)
 
85
 
 
86
 
 
87
class AnnotateOne(TestBase):
 
88
    def runTest(self):
 
89
        k = Weave()
 
90
        k.add_lines('text0', [], TEXT_0)
 
91
        self.assertEqual(k.annotate('text0'),
 
92
                         [('text0', TEXT_0[0])])
 
93
 
 
94
 
 
95
class StoreTwo(TestBase):
 
96
    def runTest(self):
 
97
        k = Weave()
 
98
 
 
99
        idx = k.add_lines('text0', [], TEXT_0)
 
100
        self.assertEqual(idx, 0)
 
101
 
 
102
        idx = k.add_lines('text1', [], TEXT_1)
 
103
        self.assertEqual(idx, 1)
 
104
 
 
105
        self.assertEqual(k.get_lines(0), TEXT_0)
 
106
        self.assertEqual(k.get_lines(1), TEXT_1)
 
107
 
 
108
 
 
109
class GetSha1(TestBase):
 
110
    def test_get_sha1(self):
 
111
        k = Weave()
 
112
        k.add_lines('text0', [], 'text0')
 
113
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
114
                         k.get_sha1('text0'))
 
115
        self.assertRaises(errors.RevisionNotPresent,
 
116
                          k.get_sha1, 0)
 
117
        self.assertRaises(errors.RevisionNotPresent,
 
118
                          k.get_sha1, 'text1')
 
119
                        
 
120
 
 
121
class InvalidAdd(TestBase):
 
122
    """Try to use invalid version number during add."""
 
123
    def runTest(self):
 
124
        k = Weave()
 
125
 
 
126
        self.assertRaises(errors.RevisionNotPresent,
 
127
                          k.add_lines,
 
128
                          'text0',
 
129
                          ['69'],
 
130
                          ['new text!'])
 
131
 
 
132
 
 
133
class RepeatedAdd(TestBase):
 
134
    """Add the same version twice; harmless."""
 
135
    def runTest(self):
 
136
        k = Weave()
 
137
        idx = k.add_lines('text0', [], TEXT_0)
 
138
        idx2 = k.add_lines('text0', [], TEXT_0)
 
139
        self.assertEqual(idx, idx2)
 
140
 
 
141
 
 
142
class InvalidRepeatedAdd(TestBase):
 
143
    def runTest(self):
 
144
        k = Weave()
 
145
        k.add_lines('basis', [], TEXT_0)
 
146
        idx = k.add_lines('text0', [], TEXT_0)
 
147
        self.assertRaises(errors.RevisionAlreadyPresent,
 
148
                          k.add_lines,
 
149
                          'text0',
 
150
                          [],
 
151
                          ['not the same text'])
 
152
        self.assertRaises(errors.RevisionAlreadyPresent,
 
153
                          k.add_lines,
 
154
                          'text0',
 
155
                          ['basis'],         # not the right parents
 
156
                          TEXT_0)
 
157
        
 
158
 
 
159
class InsertLines(TestBase):
 
160
    """Store a revision that adds one line to the original.
 
161
 
 
162
    Look at the annotations to make sure that the first line is matched
 
163
    and not stored repeatedly."""
 
164
    def runTest(self):
 
165
        k = Weave()
 
166
 
 
167
        k.add_lines('text0', [], ['line 1'])
 
168
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
 
169
 
 
170
        self.assertEqual(k.annotate('text0'),
 
171
                         [('text0', 'line 1')])
 
172
 
 
173
        self.assertEqual(k.get_lines(1),
 
174
                         ['line 1',
 
175
                          'line 2'])
 
176
 
 
177
        self.assertEqual(k.annotate('text1'),
 
178
                         [('text0', 'line 1'),
 
179
                          ('text1', 'line 2')])
 
180
 
 
181
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
 
182
 
 
183
        self.assertEqual(k.annotate('text2'),
 
184
                         [('text0', 'line 1'),
 
185
                          ('text2', 'diverged line')])
 
186
 
 
187
        text3 = ['line 1', 'middle line', 'line 2']
 
188
        k.add_lines('text3',
 
189
              ['text0', 'text1'],
 
190
              text3)
 
191
 
 
192
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
193
 
 
194
        self.log("k._weave=" + pformat(k._weave))
 
195
 
 
196
        self.assertEqual(k.annotate('text3'),
 
197
                         [('text0', 'line 1'),
 
198
                          ('text3', 'middle line'),
 
199
                          ('text1', 'line 2')])
 
200
 
 
201
        # now multiple insertions at different places
 
202
        k.add_lines('text4',
 
203
              ['text0', 'text1', 'text3'],
 
204
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
205
 
 
206
        self.assertEqual(k.annotate('text4'), 
 
207
                         [('text0', 'line 1'),
 
208
                          ('text4', 'aaa'),
 
209
                          ('text3', 'middle line'),
 
210
                          ('text4', 'bbb'),
 
211
                          ('text1', 'line 2'),
 
212
                          ('text4', 'ccc')])
 
213
 
 
214
 
 
215
class DeleteLines(TestBase):
 
216
    """Deletion of lines from existing text.
 
217
 
 
218
    Try various texts all based on a common ancestor."""
 
219
    def runTest(self):
 
220
        k = Weave()
 
221
 
 
222
        base_text = ['one', 'two', 'three', 'four']
 
223
 
 
224
        k.add_lines('text0', [], base_text)
 
225
        
 
226
        texts = [['one', 'two', 'three'],
 
227
                 ['two', 'three', 'four'],
 
228
                 ['one', 'four'],
 
229
                 ['one', 'two', 'three', 'four'],
 
230
                 ]
 
231
 
 
232
        i = 1
 
233
        for t in texts:
 
234
            ver = k.add_lines('text%d' % i,
 
235
                        ['text0'], t)
 
236
            i += 1
 
237
 
 
238
        self.log('final weave:')
 
239
        self.log('k._weave=' + pformat(k._weave))
 
240
 
 
241
        for i in range(len(texts)):
 
242
            self.assertEqual(k.get_lines(i+1),
 
243
                             texts[i])
 
244
 
 
245
 
 
246
class SuicideDelete(TestBase):
 
247
    """Invalid weave which tries to add and delete simultaneously."""
 
248
    def runTest(self):
 
249
        k = Weave()
 
250
 
 
251
        k._parents = [(),
 
252
                ]
 
253
        k._weave = [('{', 0),
 
254
                'first line',
 
255
                ('[', 0),
 
256
                'deleted in 0',
 
257
                (']', 0),
 
258
                ('}', 0),
 
259
                ]
 
260
        ################################### SKIPPED
 
261
        # Weave.get doesn't trap this anymore
 
262
        return 
 
263
 
 
264
        self.assertRaises(WeaveFormatError,
 
265
                          k.get_lines,
 
266
                          0)        
 
267
 
 
268
 
 
269
class CannedDelete(TestBase):
 
270
    """Unpack canned weave with deleted lines."""
 
271
    def runTest(self):
 
272
        k = Weave()
 
273
 
 
274
        k._parents = [(),
 
275
                frozenset([0]),
 
276
                ]
 
277
        k._weave = [('{', 0),
 
278
                'first line',
 
279
                ('[', 1),
 
280
                'line to be deleted',
 
281
                (']', 1),
 
282
                'last line',
 
283
                ('}', 0),
 
284
                ]
 
285
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
286
                  , sha_string('first linelast line')]
 
287
 
 
288
        self.assertEqual(k.get_lines(0),
 
289
                         ['first line',
 
290
                          'line to be deleted',
 
291
                          'last line',
 
292
                          ])
 
293
 
 
294
        self.assertEqual(k.get_lines(1),
 
295
                         ['first line',
 
296
                          'last line',
 
297
                          ])
 
298
 
 
299
 
 
300
class CannedReplacement(TestBase):
 
301
    """Unpack canned weave with deleted lines."""
 
302
    def runTest(self):
 
303
        k = Weave()
 
304
 
 
305
        k._parents = [frozenset(),
 
306
                frozenset([0]),
 
307
                ]
 
308
        k._weave = [('{', 0),
 
309
                'first line',
 
310
                ('[', 1),
 
311
                'line to be deleted',
 
312
                (']', 1),
 
313
                ('{', 1),
 
314
                'replacement line',                
 
315
                ('}', 1),
 
316
                'last line',
 
317
                ('}', 0),
 
318
                ]
 
319
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
320
                  , sha_string('first linereplacement linelast line')]
 
321
 
 
322
        self.assertEqual(k.get_lines(0),
 
323
                         ['first line',
 
324
                          'line to be deleted',
 
325
                          'last line',
 
326
                          ])
 
327
 
 
328
        self.assertEqual(k.get_lines(1),
 
329
                         ['first line',
 
330
                          'replacement line',
 
331
                          'last line',
 
332
                          ])
 
333
 
 
334
 
 
335
class BadWeave(TestBase):
 
336
    """Test that we trap an insert which should not occur."""
 
337
    def runTest(self):
 
338
        k = Weave()
 
339
 
 
340
        k._parents = [frozenset(),
 
341
                ]
 
342
        k._weave = ['bad line',
 
343
                ('{', 0),
 
344
                'foo {',
 
345
                ('{', 1),
 
346
                '  added in version 1',
 
347
                ('{', 2),
 
348
                '  added in v2',
 
349
                ('}', 2),
 
350
                '  also from v1',
 
351
                ('}', 1),
 
352
                '}',
 
353
                ('}', 0)]
 
354
 
 
355
        ################################### SKIPPED
 
356
        # Weave.get doesn't trap this anymore
 
357
        return 
 
358
 
 
359
 
 
360
        self.assertRaises(WeaveFormatError,
 
361
                          k.get,
 
362
                          0)
 
363
 
 
364
 
 
365
class BadInsert(TestBase):
 
366
    """Test that we trap an insert which should not occur."""
 
367
    def runTest(self):
 
368
        k = Weave()
 
369
 
 
370
        k._parents = [frozenset(),
 
371
                frozenset([0]),
 
372
                frozenset([0]),
 
373
                frozenset([0,1,2]),
 
374
                ]
 
375
        k._weave = [('{', 0),
 
376
                'foo {',
 
377
                ('{', 1),
 
378
                '  added in version 1',
 
379
                ('{', 1),
 
380
                '  more in 1',
 
381
                ('}', 1),
 
382
                ('}', 1),
 
383
                ('}', 0)]
 
384
 
 
385
 
 
386
        # this is not currently enforced by get
 
387
        return  ##########################################
 
388
 
 
389
        self.assertRaises(WeaveFormatError,
 
390
                          k.get,
 
391
                          0)
 
392
 
 
393
        self.assertRaises(WeaveFormatError,
 
394
                          k.get,
 
395
                          1)
 
396
 
 
397
 
 
398
class InsertNested(TestBase):
 
399
    """Insertion with nested instructions."""
 
400
    def runTest(self):
 
401
        k = Weave()
 
402
 
 
403
        k._parents = [frozenset(),
 
404
                frozenset([0]),
 
405
                frozenset([0]),
 
406
                frozenset([0,1,2]),
 
407
                ]
 
408
        k._weave = [('{', 0),
 
409
                'foo {',
 
410
                ('{', 1),
 
411
                '  added in version 1',
 
412
                ('{', 2),
 
413
                '  added in v2',
 
414
                ('}', 2),
 
415
                '  also from v1',
 
416
                ('}', 1),
 
417
                '}',
 
418
                ('}', 0)]
 
419
 
 
420
        k._sha1s = [sha_string('foo {}')
 
421
                  , sha_string('foo {  added in version 1  also from v1}')
 
422
                  , sha_string('foo {  added in v2}')
 
423
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
424
                  ]
 
425
 
 
426
        self.assertEqual(k.get_lines(0),
 
427
                         ['foo {',
 
428
                          '}'])
 
429
 
 
430
        self.assertEqual(k.get_lines(1),
 
431
                         ['foo {',
 
432
                          '  added in version 1',
 
433
                          '  also from v1',
 
434
                          '}'])
 
435
                       
 
436
        self.assertEqual(k.get_lines(2),
 
437
                         ['foo {',
 
438
                          '  added in v2',
 
439
                          '}'])
 
440
 
 
441
        self.assertEqual(k.get_lines(3),
 
442
                         ['foo {',
 
443
                          '  added in version 1',
 
444
                          '  added in v2',
 
445
                          '  also from v1',
 
446
                          '}'])
 
447
                         
 
448
 
 
449
class DeleteLines2(TestBase):
 
450
    """Test recording revisions that delete lines.
 
451
 
 
452
    This relies on the weave having a way to represent lines knocked
 
453
    out by a later revision."""
 
454
    def runTest(self):
 
455
        k = Weave()
 
456
 
 
457
        k.add_lines('text0', [], ["line the first",
 
458
                   "line 2",
 
459
                   "line 3",
 
460
                   "fine"])
 
461
 
 
462
        self.assertEqual(len(k.get_lines(0)), 4)
 
463
 
 
464
        k.add_lines('text1', ['text0'], ["line the first",
 
465
                   "fine"])
 
466
 
 
467
        self.assertEqual(k.get_lines(1),
 
468
                         ["line the first",
 
469
                          "fine"])
 
470
 
 
471
        self.assertEqual(k.annotate('text1'),
 
472
                         [('text0', "line the first"),
 
473
                          ('text0', "fine")])
 
474
 
 
475
 
 
476
class IncludeVersions(TestBase):
 
477
    """Check texts that are stored across multiple revisions.
 
478
 
 
479
    Here we manually create a weave with particular encoding and make
 
480
    sure it unpacks properly.
 
481
 
 
482
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
483
    lines.
 
484
    """
 
485
 
 
486
    def runTest(self):
 
487
        k = Weave()
 
488
 
 
489
        k._parents = [frozenset(), frozenset([0])]
 
490
        k._weave = [('{', 0),
 
491
                "first line",
 
492
                ('}', 0),
 
493
                ('{', 1),
 
494
                "second line",
 
495
                ('}', 1)]
 
496
 
 
497
        k._sha1s = [sha_string('first line')
 
498
                  , sha_string('first linesecond line')]
 
499
 
 
500
        self.assertEqual(k.get_lines(1),
 
501
                         ["first line",
 
502
                          "second line"])
 
503
 
 
504
        self.assertEqual(k.get_lines(0),
 
505
                         ["first line"])
 
506
 
 
507
 
 
508
class DivergedIncludes(TestBase):
 
509
    """Weave with two diverged texts based on version 0.
 
510
    """
 
511
    def runTest(self):
 
512
        # FIXME make the weave, dont poke at it.
 
513
        k = Weave()
 
514
 
 
515
        k._names = ['0', '1', '2']
 
516
        k._name_map = {'0':0, '1':1, '2':2}
 
517
        k._parents = [frozenset(),
 
518
                frozenset([0]),
 
519
                frozenset([0]),
 
520
                ]
 
521
        k._weave = [('{', 0),
 
522
                "first line",
 
523
                ('}', 0),
 
524
                ('{', 1),
 
525
                "second line",
 
526
                ('}', 1),
 
527
                ('{', 2),
 
528
                "alternative second line",
 
529
                ('}', 2),                
 
530
                ]
 
531
 
 
532
        k._sha1s = [sha_string('first line')
 
533
                  , sha_string('first linesecond line')
 
534
                  , sha_string('first linealternative second line')]
 
535
 
 
536
        self.assertEqual(k.get_lines(0),
 
537
                         ["first line"])
 
538
 
 
539
        self.assertEqual(k.get_lines(1),
 
540
                         ["first line",
 
541
                          "second line"])
 
542
 
 
543
        self.assertEqual(k.get_lines('2'),
 
544
                         ["first line",
 
545
                          "alternative second line"])
 
546
 
 
547
        self.assertEqual(list(k.get_ancestry(['2'])),
 
548
                         ['0', '2'])
 
549
 
 
550
 
 
551
class ReplaceLine(TestBase):
 
552
    def runTest(self):
 
553
        k = Weave()
 
554
 
 
555
        text0 = ['cheddar', 'stilton', 'gruyere']
 
556
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
557
        
 
558
        k.add_lines('text0', [], text0)
 
559
        k.add_lines('text1', ['text0'], text1)
 
560
 
 
561
        self.log('k._weave=' + pformat(k._weave))
 
562
 
 
563
        self.assertEqual(k.get_lines(0), text0)
 
564
        self.assertEqual(k.get_lines(1), text1)
 
565
 
 
566
 
 
567
class Merge(TestBase):
 
568
    """Storage of versions that merge diverged parents"""
 
569
    def runTest(self):
 
570
        k = Weave()
 
571
 
 
572
        texts = [['header'],
 
573
                 ['header', '', 'line from 1'],
 
574
                 ['header', '', 'line from 2', 'more from 2'],
 
575
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
576
                 ]
 
577
 
 
578
        k.add_lines('text0', [], texts[0])
 
579
        k.add_lines('text1', ['text0'], texts[1])
 
580
        k.add_lines('text2', ['text0'], texts[2])
 
581
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
 
582
 
 
583
        for i, t in enumerate(texts):
 
584
            self.assertEqual(k.get_lines(i), t)
 
585
 
 
586
        self.assertEqual(k.annotate('merge'),
 
587
                         [('text0', 'header'),
 
588
                          ('text1', ''),
 
589
                          ('text1', 'line from 1'),
 
590
                          ('merge', 'fixup line'),
 
591
                          ('text2', 'line from 2'),
 
592
                          ])
 
593
 
 
594
        self.assertEqual(list(k.get_ancestry(['merge'])),
 
595
                         ['text0', 'text1', 'text2', 'merge'])
 
596
 
 
597
        self.log('k._weave=' + pformat(k._weave))
 
598
 
 
599
        self.check_read_write(k)
 
600
 
 
601
 
 
602
class Conflicts(TestBase):
 
603
    """Test detection of conflicting regions during a merge.
 
604
 
 
605
    A base version is inserted, then two descendents try to
 
606
    insert different lines in the same place.  These should be
 
607
    reported as a possible conflict and forwarded to the user."""
 
608
    def runTest(self):
 
609
        return  # NOT RUN
 
610
        k = Weave()
 
611
 
 
612
        k.add_lines([], ['aaa', 'bbb'])
 
613
        k.add_lines([0], ['aaa', '111', 'bbb'])
 
614
        k.add_lines([1], ['aaa', '222', 'bbb'])
 
615
 
 
616
        merged = k.merge([1, 2])
 
617
 
 
618
        self.assertEquals([[['aaa']],
 
619
                           [['111'], ['222']],
 
620
                           [['bbb']]])
 
621
 
 
622
 
 
623
class NonConflict(TestBase):
 
624
    """Two descendants insert compatible changes.
 
625
 
 
626
    No conflict should be reported."""
 
627
    def runTest(self):
 
628
        return  # NOT RUN
 
629
        k = Weave()
 
630
 
 
631
        k.add_lines([], ['aaa', 'bbb'])
 
632
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
 
633
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
 
634
 
 
635
 
 
636
class Khayyam(TestBase):
 
637
    """Test changes to multi-line texts, and read/write"""
 
638
 
 
639
    def test_multi_line_merge(self):
 
640
        rawtexts = [
 
641
            """A Book of Verses underneath the Bough,
 
642
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
643
            Beside me singing in the Wilderness --
 
644
            Oh, Wilderness were Paradise enow!""",
 
645
            
 
646
            """A Book of Verses underneath the Bough,
 
647
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
648
            Beside me singing in the Wilderness --
 
649
            Oh, Wilderness were Paradise now!""",
 
650
 
 
651
            """A Book of poems underneath the tree,
 
652
            A Jug of Wine, a Loaf of Bread,
 
653
            and Thou
 
654
            Beside me singing in the Wilderness --
 
655
            Oh, Wilderness were Paradise now!
 
656
 
 
657
            -- O. Khayyam""",
 
658
 
 
659
            """A Book of Verses underneath the Bough,
 
660
            A Jug of Wine, a Loaf of Bread,
 
661
            and Thou
 
662
            Beside me singing in the Wilderness --
 
663
            Oh, Wilderness were Paradise now!""",
 
664
            ]
 
665
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
666
 
 
667
        k = Weave()
 
668
        parents = set()
 
669
        i = 0
 
670
        for t in texts:
 
671
            ver = k.add_lines('text%d' % i,
 
672
                        list(parents), t)
 
673
            parents.add('text%d' % i)
 
674
            i += 1
 
675
 
 
676
        self.log("k._weave=" + pformat(k._weave))
 
677
 
 
678
        for i, t in enumerate(texts):
 
679
            self.assertEqual(k.get_lines(i), t)
 
680
 
 
681
        self.check_read_write(k)
 
682
 
 
683
 
 
684
class MergeCases(TestBase):
 
685
    def doMerge(self, base, a, b, mp):
 
686
        from cStringIO import StringIO
 
687
        from textwrap import dedent
 
688
 
 
689
        def addcrlf(x):
 
690
            return x + '\n'
 
691
        
 
692
        w = Weave()
 
693
        w.add_lines('text0', [], map(addcrlf, base))
 
694
        w.add_lines('text1', ['text0'], map(addcrlf, a))
 
695
        w.add_lines('text2', ['text0'], map(addcrlf, b))
 
696
 
 
697
        self.log('weave is:')
 
698
        tmpf = StringIO()
 
699
        write_weave(w, tmpf)
 
700
        self.log(tmpf.getvalue())
 
701
 
 
702
        self.log('merge plan:')
 
703
        p = list(w.plan_merge('text1', 'text2'))
 
704
        for state, line in p:
 
705
            if line:
 
706
                self.log('%12s | %s' % (state, line[:-1]))
 
707
 
 
708
        self.log('merge:')
 
709
        mt = StringIO()
 
710
        mt.writelines(w.weave_merge(p))
 
711
        mt.seek(0)
 
712
        self.log(mt.getvalue())
 
713
 
 
714
        mp = map(addcrlf, mp)
 
715
        self.assertEqual(mt.readlines(), mp)
 
716
        
 
717
        
 
718
    def testOneInsert(self):
 
719
        self.doMerge([],
 
720
                     ['aa'],
 
721
                     [],
 
722
                     ['aa'])
 
723
 
 
724
    def testSeparateInserts(self):
 
725
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
726
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
727
                     ['aaa', 'bbb', 'yyy', 'ccc'],
 
728
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
729
 
 
730
    def testSameInsert(self):
 
731
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
732
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
733
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
 
734
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
735
 
 
736
    def testOverlappedInsert(self):
 
737
        self.doMerge(['aaa', 'bbb'],
 
738
                     ['aaa', 'xxx', 'yyy', 'bbb'],
 
739
                     ['aaa', 'xxx', 'bbb'],
 
740
                     ['aaa', '<<<<<<< ', 'xxx', 'yyy', '=======', 'xxx', 
 
741
                      '>>>>>>> ', 'bbb'])
 
742
 
 
743
        # really it ought to reduce this to 
 
744
        # ['aaa', 'xxx', 'yyy', 'bbb']
 
745
 
 
746
 
 
747
    def testClashReplace(self):
 
748
        self.doMerge(['aaa'],
 
749
                     ['xxx'],
 
750
                     ['yyy', 'zzz'],
 
751
                     ['<<<<<<< ', 'xxx', '=======', 'yyy', 'zzz', 
 
752
                      '>>>>>>> '])
 
753
 
 
754
    def testNonClashInsert(self):
 
755
        self.doMerge(['aaa'],
 
756
                     ['xxx', 'aaa'],
 
757
                     ['yyy', 'zzz'],
 
758
                     ['<<<<<<< ', 'xxx', 'aaa', '=======', 'yyy', 'zzz', 
 
759
                      '>>>>>>> '])
 
760
 
 
761
        self.doMerge(['aaa'],
 
762
                     ['aaa'],
 
763
                     ['yyy', 'zzz'],
 
764
                     ['yyy', 'zzz'])
 
765
 
 
766
 
 
767
    def testDeleteAndModify(self):
 
768
        """Clashing delete and modification.
 
769
 
 
770
        If one side modifies a region and the other deletes it then
 
771
        there should be a conflict with one side blank.
 
772
        """
 
773
 
 
774
        #######################################
 
775
        # skippd, not working yet
 
776
        return
 
777
        
 
778
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
779
                     ['aaa', 'ddd', 'ccc'],
 
780
                     ['aaa', 'ccc'],
 
781
                     ['<<<<<<<< ', 'aaa', '=======', '>>>>>>> ', 'ccc'])
 
782
 
 
783
 
 
784
class JoinWeavesTests(TestBase):
 
785
    def setUp(self):
 
786
        super(JoinWeavesTests, self).setUp()
 
787
        self.weave1 = Weave()
 
788
        self.lines1 = ['hello\n']
 
789
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
790
        self.weave1.add_lines('v1', [], self.lines1)
 
791
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
792
        self.weave1.add_lines('v3', ['v2'], self.lines3)
 
793
        
 
794
    def test_join_empty(self):
 
795
        """Join two empty weaves."""
 
796
        eq = self.assertEqual
 
797
        w1 = Weave()
 
798
        w2 = Weave()
 
799
        w1.join(w2)
 
800
        eq(len(w1), 0)
 
801
        
 
802
    def test_join_empty_to_nonempty(self):
 
803
        """Join empty weave onto nonempty."""
 
804
        self.weave1.join(Weave())
 
805
        self.assertEqual(len(self.weave1), 3)
 
806
 
 
807
    def test_join_unrelated(self):
 
808
        """Join two weaves with no history in common."""
 
809
        wb = Weave()
 
810
        wb.add_lines('b1', [], ['line from b\n'])
 
811
        w1 = self.weave1
 
812
        w1.join(wb)
 
813
        eq = self.assertEqual
 
814
        eq(len(w1), 4)
 
815
        eq(sorted(w1.versions()),
 
816
           ['b1', 'v1', 'v2', 'v3'])
 
817
 
 
818
    def test_join_related(self):
 
819
        wa = self.weave1.copy()
 
820
        wb = self.weave1.copy()
 
821
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
822
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
823
        eq = self.assertEquals
 
824
        eq(len(wa), 4)
 
825
        eq(len(wb), 4)
 
826
        wa.join(wb)
 
827
        eq(len(wa), 5)
 
828
        eq(wa.get_lines('b1'),
 
829
           ['hello\n', 'pale blue\n', 'world\n'])
 
830
 
 
831
    def test_join_parent_disagreement(self):
 
832
        #join reconciles differening parents into a union.
 
833
        wa = Weave()
 
834
        wb = Weave()
 
835
        wa.add_lines('v1', [], ['hello\n'])
 
836
        wb.add_lines('v0', [], [])
 
837
        wb.add_lines('v1', ['v0'], ['hello\n'])
 
838
        wa.join(wb)
 
839
        self.assertEqual(['v0'], wa.get_parents('v1'))
 
840
 
 
841
    def test_join_text_disagreement(self):
 
842
        """Cannot join weaves with different texts for a version."""
 
843
        wa = Weave()
 
844
        wb = Weave()
 
845
        wa.add_lines('v1', [], ['hello\n'])
 
846
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
 
847
        self.assertRaises(WeaveError,
 
848
                          wa.join, wb)
 
849
 
 
850
    def test_join_unordered(self):
 
851
        """Join weaves where indexes differ.
 
852
        
 
853
        The source weave contains a different version at index 0."""
 
854
        wa = self.weave1.copy()
 
855
        wb = Weave()
 
856
        wb.add_lines('x1', [], ['line from x1\n'])
 
857
        wb.add_lines('v1', [], ['hello\n'])
 
858
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
859
        wa.join(wb)
 
860
        eq = self.assertEquals
 
861
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
 
862
        eq(wa.get_text('x1'), 'line from x1\n')
 
863
 
 
864
    def test_written_detection(self):
 
865
        # Test detection of weave file corruption.
 
866
        #
 
867
        # Make sure that we can detect if a weave file has
 
868
        # been corrupted. This doesn't test all forms of corruption,
 
869
        # but it at least helps verify the data you get, is what you want.
 
870
        from cStringIO import StringIO
 
871
 
 
872
        w = Weave()
 
873
        w.add_lines('v1', [], ['hello\n'])
 
874
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
875
 
 
876
        tmpf = StringIO()
 
877
        write_weave(w, tmpf)
 
878
 
 
879
        # Because we are corrupting, we need to make sure we have the exact text
 
880
        self.assertEquals('# bzr weave file v5\n'
 
881
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
882
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
883
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
884
                          tmpf.getvalue())
 
885
 
 
886
        # Change a single letter
 
887
        tmpf = StringIO('# bzr weave file v5\n'
 
888
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
889
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
890
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
891
 
 
892
        w = read_weave(tmpf)
 
893
 
 
894
        self.assertEqual('hello\n', w.get_text('v1'))
 
895
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
896
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
897
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
898
 
 
899
        # Change the sha checksum
 
900
        tmpf = StringIO('# bzr weave file v5\n'
 
901
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
902
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
903
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
904
 
 
905
        w = read_weave(tmpf)
 
906
 
 
907
        self.assertEqual('hello\n', w.get_text('v1'))
 
908
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
909
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
910
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
911
 
 
912
 
 
913
class InstrumentedWeave(Weave):
 
914
    """Keep track of how many times functions are called."""
 
915
    
 
916
    def __init__(self, weave_name=None):
 
917
        self._extract_count = 0
 
918
        Weave.__init__(self, weave_name=weave_name)
 
919
 
 
920
    def _extract(self, versions):
 
921
        self._extract_count += 1
 
922
        return Weave._extract(self, versions)
 
923
 
 
924
 
 
925
class JoinOptimization(TestCase):
 
926
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
927
 
 
928
    def test_join(self):
 
929
        w1 = InstrumentedWeave()
 
930
        w2 = InstrumentedWeave()
 
931
 
 
932
        txt0 = ['a\n']
 
933
        txt1 = ['a\n', 'b\n']
 
934
        txt2 = ['a\n', 'c\n']
 
935
        txt3 = ['a\n', 'b\n', 'c\n']
 
936
 
 
937
        w1.add_lines('txt0', [], txt0) # extract 1a
 
938
        w2.add_lines('txt0', [], txt0) # extract 1b
 
939
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
 
940
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
 
941
        w1.join(w2) # extract 3a to add txt2 
 
942
        w2.join(w1) # extract 3b to add txt1 
 
943
 
 
944
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
 
945
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
 
946
        # These secretly have inverted parents
 
947
 
 
948
        # This should not have to do any extractions
 
949
        w1.join(w2) # NO extract, texts already present with same parents
 
950
        w2.join(w1) # NO extract, texts already present with same parents
 
951
 
 
952
        self.assertEqual(4, w1._extract_count)
 
953
        self.assertEqual(4, w2._extract_count)
 
954
 
 
955
    def test_double_parent(self):
 
956
        # It should not be considered illegal to add
 
957
        # a revision with the same parent twice
 
958
        w1 = InstrumentedWeave()
 
959
        w2 = InstrumentedWeave()
 
960
 
 
961
        txt0 = ['a\n']
 
962
        txt1 = ['a\n', 'b\n']
 
963
        txt2 = ['a\n', 'c\n']
 
964
        txt3 = ['a\n', 'b\n', 'c\n']
 
965
 
 
966
        w1.add_lines('txt0', [], txt0)
 
967
        w2.add_lines('txt0', [], txt0)
 
968
        w1.add_lines('txt1', ['txt0'], txt1)
 
969
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
 
970
        # Same text, effectively the same, because the
 
971
        # parent is only repeated
 
972
        w1.join(w2) # extract 3a to add txt2 
 
973
        w2.join(w1) # extract 3b to add txt1 
 
974
 
 
975
 
 
976
class TestNeedsRweave(TestCase):
 
977
    """Internal corner cases for when reweave is needed."""
 
978
 
 
979
    def test_compatible_parents(self):
 
980
        w1 = Weave('a')
 
981
        my_parents = set([1, 2, 3])
 
982
        # subsets are ok
 
983
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
 
984
        # same sets
 
985
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
986
        # same empty corner case
 
987
        self.assertTrue(w1._compatible_parents(set(), set()))
 
988
        # other cannot contain stuff my_parents does not
 
989
        self.assertFalse(w1._compatible_parents(set(), set([1])))
 
990
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
 
991
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
 
992
        
 
993