~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Martin Pool
  • Date: 2005-05-09 03:03:55 UTC
  • Revision ID: mbp@sourcefrog.net-20050509030355-ad6ab558d1362959
- Don't give an error if the trace file can't be opened

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#! /usr/bin/python2.4
2
 
 
3
 
# Copyright (C) 2005 by Canonical Ltd
4
 
 
5
 
# This program is free software; you can redistribute it and/or modify
6
 
# it under the terms of the GNU General Public License as published by
7
 
# the Free Software Foundation; either version 2 of the License, or
8
 
# (at your option) any later version.
9
 
 
10
 
# This program is distributed in the hope that it will be useful,
11
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
# GNU General Public License for more details.
14
 
 
15
 
# You should have received a copy of the GNU General Public License
16
 
# along with this program; if not, write to the Free Software
17
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
 
 
19
 
 
20
 
# TODO: tests regarding version names
21
 
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
22
 
#       if it fails.
23
 
 
24
 
"""test suite for weave algorithm"""
25
 
 
26
 
from pprint import pformat
27
 
 
28
 
import bzrlib.errors as errors
29
 
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
30
 
from bzrlib.weavefile import write_weave, read_weave
31
 
from bzrlib.tests import TestCase
32
 
from bzrlib.osutils import sha_string
33
 
 
34
 
 
35
 
# texts for use in testing
36
 
TEXT_0 = ["Hello world"]
37
 
TEXT_1 = ["Hello world",
38
 
          "A second line"]
39
 
 
40
 
 
41
 
class TestBase(TestCase):
42
 
    def check_read_write(self, k):
43
 
        """Check the weave k can be written & re-read."""
44
 
        from tempfile import TemporaryFile
45
 
        tf = TemporaryFile()
46
 
 
47
 
        write_weave(k, tf)
48
 
        tf.seek(0)
49
 
        k2 = read_weave(tf)
50
 
 
51
 
        if k != k2:
52
 
            tf.seek(0)
53
 
            self.log('serialized weave:')
54
 
            self.log(tf.read())
55
 
 
56
 
            self.log('')
57
 
            self.log('parents: %s' % (k._parents == k2._parents))
58
 
            self.log('         %r' % k._parents)
59
 
            self.log('         %r' % k2._parents)
60
 
            self.log('')
61
 
            self.fail('read/write check failed')
62
 
 
63
 
 
64
 
class WeaveContains(TestBase):
65
 
    """Weave __contains__ operator"""
66
 
    def runTest(self):
67
 
        k = Weave()
68
 
        self.assertFalse('foo' in k)
69
 
        k.add_lines('foo', [], TEXT_1)
70
 
        self.assertTrue('foo' in k)
71
 
 
72
 
 
73
 
class Easy(TestBase):
74
 
    def runTest(self):
75
 
        k = Weave()
76
 
 
77
 
 
78
 
class StoreText(TestBase):
79
 
    """Store and retrieve a simple text."""
80
 
 
81
 
    def test_storing_text(self):
82
 
        k = Weave()
83
 
        idx = k.add_lines('text0', [], TEXT_0)
84
 
        self.assertEqual(k.get_lines(idx), TEXT_0)
85
 
        self.assertEqual(idx, 0)
86
 
 
87
 
 
88
 
class AnnotateOne(TestBase):
89
 
    def runTest(self):
90
 
        k = Weave()
91
 
        k.add_lines('text0', [], TEXT_0)
92
 
        self.assertEqual(k.annotate('text0'),
93
 
                         [('text0', TEXT_0[0])])
94
 
 
95
 
 
96
 
class StoreTwo(TestBase):
97
 
    def runTest(self):
98
 
        k = Weave()
99
 
 
100
 
        idx = k.add_lines('text0', [], TEXT_0)
101
 
        self.assertEqual(idx, 0)
102
 
 
103
 
        idx = k.add_lines('text1', [], TEXT_1)
104
 
        self.assertEqual(idx, 1)
105
 
 
106
 
        self.assertEqual(k.get_lines(0), TEXT_0)
107
 
        self.assertEqual(k.get_lines(1), TEXT_1)
108
 
 
109
 
 
110
 
class GetSha1(TestBase):
111
 
    def test_get_sha1(self):
112
 
        k = Weave()
113
 
        k.add_lines('text0', [], 'text0')
114
 
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
115
 
                         k.get_sha1('text0'))
116
 
        self.assertRaises(errors.RevisionNotPresent,
117
 
                          k.get_sha1, 0)
118
 
        self.assertRaises(errors.RevisionNotPresent,
119
 
                          k.get_sha1, 'text1')
120
 
                        
121
 
 
122
 
class InvalidAdd(TestBase):
123
 
    """Try to use invalid version number during add."""
124
 
    def runTest(self):
125
 
        k = Weave()
126
 
 
127
 
        self.assertRaises(errors.RevisionNotPresent,
128
 
                          k.add_lines,
129
 
                          'text0',
130
 
                          ['69'],
131
 
                          ['new text!'])
132
 
 
133
 
 
134
 
class RepeatedAdd(TestBase):
135
 
    """Add the same version twice; harmless."""
136
 
    def runTest(self):
137
 
        k = Weave()
138
 
        idx = k.add_lines('text0', [], TEXT_0)
139
 
        idx2 = k.add_lines('text0', [], TEXT_0)
140
 
        self.assertEqual(idx, idx2)
141
 
 
142
 
 
143
 
class InvalidRepeatedAdd(TestBase):
144
 
    def runTest(self):
145
 
        k = Weave()
146
 
        k.add_lines('basis', [], TEXT_0)
147
 
        idx = k.add_lines('text0', [], TEXT_0)
148
 
        self.assertRaises(errors.RevisionAlreadyPresent,
149
 
                          k.add_lines,
150
 
                          'text0',
151
 
                          [],
152
 
                          ['not the same text'])
153
 
        self.assertRaises(errors.RevisionAlreadyPresent,
154
 
                          k.add_lines,
155
 
                          'text0',
156
 
                          ['basis'],         # not the right parents
157
 
                          TEXT_0)
158
 
        
159
 
 
160
 
class InsertLines(TestBase):
161
 
    """Store a revision that adds one line to the original.
162
 
 
163
 
    Look at the annotations to make sure that the first line is matched
164
 
    and not stored repeatedly."""
165
 
    def runTest(self):
166
 
        k = Weave()
167
 
 
168
 
        k.add_lines('text0', [], ['line 1'])
169
 
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
170
 
 
171
 
        self.assertEqual(k.annotate('text0'),
172
 
                         [('text0', 'line 1')])
173
 
 
174
 
        self.assertEqual(k.get_lines(1),
175
 
                         ['line 1',
176
 
                          'line 2'])
177
 
 
178
 
        self.assertEqual(k.annotate('text1'),
179
 
                         [('text0', 'line 1'),
180
 
                          ('text1', 'line 2')])
181
 
 
182
 
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
183
 
 
184
 
        self.assertEqual(k.annotate('text2'),
185
 
                         [('text0', 'line 1'),
186
 
                          ('text2', 'diverged line')])
187
 
 
188
 
        text3 = ['line 1', 'middle line', 'line 2']
189
 
        k.add_lines('text3',
190
 
              ['text0', 'text1'],
191
 
              text3)
192
 
 
193
 
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
194
 
 
195
 
        self.log("k._weave=" + pformat(k._weave))
196
 
 
197
 
        self.assertEqual(k.annotate('text3'),
198
 
                         [('text0', 'line 1'),
199
 
                          ('text3', 'middle line'),
200
 
                          ('text1', 'line 2')])
201
 
 
202
 
        # now multiple insertions at different places
203
 
        k.add_lines('text4',
204
 
              ['text0', 'text1', 'text3'],
205
 
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
206
 
 
207
 
        self.assertEqual(k.annotate('text4'), 
208
 
                         [('text0', 'line 1'),
209
 
                          ('text4', 'aaa'),
210
 
                          ('text3', 'middle line'),
211
 
                          ('text4', 'bbb'),
212
 
                          ('text1', 'line 2'),
213
 
                          ('text4', 'ccc')])
214
 
 
215
 
 
216
 
class DeleteLines(TestBase):
217
 
    """Deletion of lines from existing text.
218
 
 
219
 
    Try various texts all based on a common ancestor."""
220
 
    def runTest(self):
221
 
        k = Weave()
222
 
 
223
 
        base_text = ['one', 'two', 'three', 'four']
224
 
 
225
 
        k.add_lines('text0', [], base_text)
226
 
        
227
 
        texts = [['one', 'two', 'three'],
228
 
                 ['two', 'three', 'four'],
229
 
                 ['one', 'four'],
230
 
                 ['one', 'two', 'three', 'four'],
231
 
                 ]
232
 
 
233
 
        i = 1
234
 
        for t in texts:
235
 
            ver = k.add_lines('text%d' % i,
236
 
                        ['text0'], t)
237
 
            i += 1
238
 
 
239
 
        self.log('final weave:')
240
 
        self.log('k._weave=' + pformat(k._weave))
241
 
 
242
 
        for i in range(len(texts)):
243
 
            self.assertEqual(k.get_lines(i+1),
244
 
                             texts[i])
245
 
 
246
 
 
247
 
class SuicideDelete(TestBase):
248
 
    """Invalid weave which tries to add and delete simultaneously."""
249
 
    def runTest(self):
250
 
        k = Weave()
251
 
 
252
 
        k._parents = [(),
253
 
                ]
254
 
        k._weave = [('{', 0),
255
 
                'first line',
256
 
                ('[', 0),
257
 
                'deleted in 0',
258
 
                (']', 0),
259
 
                ('}', 0),
260
 
                ]
261
 
        ################################### SKIPPED
262
 
        # Weave.get doesn't trap this anymore
263
 
        return 
264
 
 
265
 
        self.assertRaises(WeaveFormatError,
266
 
                          k.get_lines,
267
 
                          0)        
268
 
 
269
 
 
270
 
class CannedDelete(TestBase):
271
 
    """Unpack canned weave with deleted lines."""
272
 
    def runTest(self):
273
 
        k = Weave()
274
 
 
275
 
        k._parents = [(),
276
 
                frozenset([0]),
277
 
                ]
278
 
        k._weave = [('{', 0),
279
 
                'first line',
280
 
                ('[', 1),
281
 
                'line to be deleted',
282
 
                (']', 1),
283
 
                'last line',
284
 
                ('}', 0),
285
 
                ]
286
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
287
 
                  , sha_string('first linelast line')]
288
 
 
289
 
        self.assertEqual(k.get_lines(0),
290
 
                         ['first line',
291
 
                          'line to be deleted',
292
 
                          'last line',
293
 
                          ])
294
 
 
295
 
        self.assertEqual(k.get_lines(1),
296
 
                         ['first line',
297
 
                          'last line',
298
 
                          ])
299
 
 
300
 
 
301
 
class CannedReplacement(TestBase):
302
 
    """Unpack canned weave with deleted lines."""
303
 
    def runTest(self):
304
 
        k = Weave()
305
 
 
306
 
        k._parents = [frozenset(),
307
 
                frozenset([0]),
308
 
                ]
309
 
        k._weave = [('{', 0),
310
 
                'first line',
311
 
                ('[', 1),
312
 
                'line to be deleted',
313
 
                (']', 1),
314
 
                ('{', 1),
315
 
                'replacement line',                
316
 
                ('}', 1),
317
 
                'last line',
318
 
                ('}', 0),
319
 
                ]
320
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
321
 
                  , sha_string('first linereplacement linelast line')]
322
 
 
323
 
        self.assertEqual(k.get_lines(0),
324
 
                         ['first line',
325
 
                          'line to be deleted',
326
 
                          'last line',
327
 
                          ])
328
 
 
329
 
        self.assertEqual(k.get_lines(1),
330
 
                         ['first line',
331
 
                          'replacement line',
332
 
                          'last line',
333
 
                          ])
334
 
 
335
 
 
336
 
class BadWeave(TestBase):
337
 
    """Test that we trap an insert which should not occur."""
338
 
    def runTest(self):
339
 
        k = Weave()
340
 
 
341
 
        k._parents = [frozenset(),
342
 
                ]
343
 
        k._weave = ['bad line',
344
 
                ('{', 0),
345
 
                'foo {',
346
 
                ('{', 1),
347
 
                '  added in version 1',
348
 
                ('{', 2),
349
 
                '  added in v2',
350
 
                ('}', 2),
351
 
                '  also from v1',
352
 
                ('}', 1),
353
 
                '}',
354
 
                ('}', 0)]
355
 
 
356
 
        ################################### SKIPPED
357
 
        # Weave.get doesn't trap this anymore
358
 
        return 
359
 
 
360
 
 
361
 
        self.assertRaises(WeaveFormatError,
362
 
                          k.get,
363
 
                          0)
364
 
 
365
 
 
366
 
class BadInsert(TestBase):
367
 
    """Test that we trap an insert which should not occur."""
368
 
    def runTest(self):
369
 
        k = Weave()
370
 
 
371
 
        k._parents = [frozenset(),
372
 
                frozenset([0]),
373
 
                frozenset([0]),
374
 
                frozenset([0,1,2]),
375
 
                ]
376
 
        k._weave = [('{', 0),
377
 
                'foo {',
378
 
                ('{', 1),
379
 
                '  added in version 1',
380
 
                ('{', 1),
381
 
                '  more in 1',
382
 
                ('}', 1),
383
 
                ('}', 1),
384
 
                ('}', 0)]
385
 
 
386
 
 
387
 
        # this is not currently enforced by get
388
 
        return  ##########################################
389
 
 
390
 
        self.assertRaises(WeaveFormatError,
391
 
                          k.get,
392
 
                          0)
393
 
 
394
 
        self.assertRaises(WeaveFormatError,
395
 
                          k.get,
396
 
                          1)
397
 
 
398
 
 
399
 
class InsertNested(TestBase):
400
 
    """Insertion with nested instructions."""
401
 
    def runTest(self):
402
 
        k = Weave()
403
 
 
404
 
        k._parents = [frozenset(),
405
 
                frozenset([0]),
406
 
                frozenset([0]),
407
 
                frozenset([0,1,2]),
408
 
                ]
409
 
        k._weave = [('{', 0),
410
 
                'foo {',
411
 
                ('{', 1),
412
 
                '  added in version 1',
413
 
                ('{', 2),
414
 
                '  added in v2',
415
 
                ('}', 2),
416
 
                '  also from v1',
417
 
                ('}', 1),
418
 
                '}',
419
 
                ('}', 0)]
420
 
 
421
 
        k._sha1s = [sha_string('foo {}')
422
 
                  , sha_string('foo {  added in version 1  also from v1}')
423
 
                  , sha_string('foo {  added in v2}')
424
 
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
425
 
                  ]
426
 
 
427
 
        self.assertEqual(k.get_lines(0),
428
 
                         ['foo {',
429
 
                          '}'])
430
 
 
431
 
        self.assertEqual(k.get_lines(1),
432
 
                         ['foo {',
433
 
                          '  added in version 1',
434
 
                          '  also from v1',
435
 
                          '}'])
436
 
                       
437
 
        self.assertEqual(k.get_lines(2),
438
 
                         ['foo {',
439
 
                          '  added in v2',
440
 
                          '}'])
441
 
 
442
 
        self.assertEqual(k.get_lines(3),
443
 
                         ['foo {',
444
 
                          '  added in version 1',
445
 
                          '  added in v2',
446
 
                          '  also from v1',
447
 
                          '}'])
448
 
                         
449
 
 
450
 
class DeleteLines2(TestBase):
451
 
    """Test recording revisions that delete lines.
452
 
 
453
 
    This relies on the weave having a way to represent lines knocked
454
 
    out by a later revision."""
455
 
    def runTest(self):
456
 
        k = Weave()
457
 
 
458
 
        k.add_lines('text0', [], ["line the first",
459
 
                   "line 2",
460
 
                   "line 3",
461
 
                   "fine"])
462
 
 
463
 
        self.assertEqual(len(k.get_lines(0)), 4)
464
 
 
465
 
        k.add_lines('text1', ['text0'], ["line the first",
466
 
                   "fine"])
467
 
 
468
 
        self.assertEqual(k.get_lines(1),
469
 
                         ["line the first",
470
 
                          "fine"])
471
 
 
472
 
        self.assertEqual(k.annotate('text1'),
473
 
                         [('text0', "line the first"),
474
 
                          ('text0', "fine")])
475
 
 
476
 
 
477
 
class IncludeVersions(TestBase):
478
 
    """Check texts that are stored across multiple revisions.
479
 
 
480
 
    Here we manually create a weave with particular encoding and make
481
 
    sure it unpacks properly.
482
 
 
483
 
    Text 0 includes nothing; text 1 includes text 0 and adds some
484
 
    lines.
485
 
    """
486
 
 
487
 
    def runTest(self):
488
 
        k = Weave()
489
 
 
490
 
        k._parents = [frozenset(), frozenset([0])]
491
 
        k._weave = [('{', 0),
492
 
                "first line",
493
 
                ('}', 0),
494
 
                ('{', 1),
495
 
                "second line",
496
 
                ('}', 1)]
497
 
 
498
 
        k._sha1s = [sha_string('first line')
499
 
                  , sha_string('first linesecond line')]
500
 
 
501
 
        self.assertEqual(k.get_lines(1),
502
 
                         ["first line",
503
 
                          "second line"])
504
 
 
505
 
        self.assertEqual(k.get_lines(0),
506
 
                         ["first line"])
507
 
 
508
 
 
509
 
class DivergedIncludes(TestBase):
510
 
    """Weave with two diverged texts based on version 0.
511
 
    """
512
 
    def runTest(self):
513
 
        # FIXME make the weave, dont poke at it.
514
 
        k = Weave()
515
 
 
516
 
        k._names = ['0', '1', '2']
517
 
        k._name_map = {'0':0, '1':1, '2':2}
518
 
        k._parents = [frozenset(),
519
 
                frozenset([0]),
520
 
                frozenset([0]),
521
 
                ]
522
 
        k._weave = [('{', 0),
523
 
                "first line",
524
 
                ('}', 0),
525
 
                ('{', 1),
526
 
                "second line",
527
 
                ('}', 1),
528
 
                ('{', 2),
529
 
                "alternative second line",
530
 
                ('}', 2),                
531
 
                ]
532
 
 
533
 
        k._sha1s = [sha_string('first line')
534
 
                  , sha_string('first linesecond line')
535
 
                  , sha_string('first linealternative second line')]
536
 
 
537
 
        self.assertEqual(k.get_lines(0),
538
 
                         ["first line"])
539
 
 
540
 
        self.assertEqual(k.get_lines(1),
541
 
                         ["first line",
542
 
                          "second line"])
543
 
 
544
 
        self.assertEqual(k.get_lines('2'),
545
 
                         ["first line",
546
 
                          "alternative second line"])
547
 
 
548
 
        self.assertEqual(list(k.get_ancestry(['2'])),
549
 
                         ['0', '2'])
550
 
 
551
 
 
552
 
class ReplaceLine(TestBase):
553
 
    def runTest(self):
554
 
        k = Weave()
555
 
 
556
 
        text0 = ['cheddar', 'stilton', 'gruyere']
557
 
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
558
 
        
559
 
        k.add_lines('text0', [], text0)
560
 
        k.add_lines('text1', ['text0'], text1)
561
 
 
562
 
        self.log('k._weave=' + pformat(k._weave))
563
 
 
564
 
        self.assertEqual(k.get_lines(0), text0)
565
 
        self.assertEqual(k.get_lines(1), text1)
566
 
 
567
 
 
568
 
class Merge(TestBase):
569
 
    """Storage of versions that merge diverged parents"""
570
 
    def runTest(self):
571
 
        k = Weave()
572
 
 
573
 
        texts = [['header'],
574
 
                 ['header', '', 'line from 1'],
575
 
                 ['header', '', 'line from 2', 'more from 2'],
576
 
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
577
 
                 ]
578
 
 
579
 
        k.add_lines('text0', [], texts[0])
580
 
        k.add_lines('text1', ['text0'], texts[1])
581
 
        k.add_lines('text2', ['text0'], texts[2])
582
 
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
583
 
 
584
 
        for i, t in enumerate(texts):
585
 
            self.assertEqual(k.get_lines(i), t)
586
 
 
587
 
        self.assertEqual(k.annotate('merge'),
588
 
                         [('text0', 'header'),
589
 
                          ('text1', ''),
590
 
                          ('text1', 'line from 1'),
591
 
                          ('merge', 'fixup line'),
592
 
                          ('text2', 'line from 2'),
593
 
                          ])
594
 
 
595
 
        self.assertEqual(list(k.get_ancestry(['merge'])),
596
 
                         ['text0', 'text1', 'text2', 'merge'])
597
 
 
598
 
        self.log('k._weave=' + pformat(k._weave))
599
 
 
600
 
        self.check_read_write(k)
601
 
 
602
 
 
603
 
class Conflicts(TestBase):
604
 
    """Test detection of conflicting regions during a merge.
605
 
 
606
 
    A base version is inserted, then two descendents try to
607
 
    insert different lines in the same place.  These should be
608
 
    reported as a possible conflict and forwarded to the user."""
609
 
    def runTest(self):
610
 
        return  # NOT RUN
611
 
        k = Weave()
612
 
 
613
 
        k.add_lines([], ['aaa', 'bbb'])
614
 
        k.add_lines([0], ['aaa', '111', 'bbb'])
615
 
        k.add_lines([1], ['aaa', '222', 'bbb'])
616
 
 
617
 
        merged = k.merge([1, 2])
618
 
 
619
 
        self.assertEquals([[['aaa']],
620
 
                           [['111'], ['222']],
621
 
                           [['bbb']]])
622
 
 
623
 
 
624
 
class NonConflict(TestBase):
625
 
    """Two descendants insert compatible changes.
626
 
 
627
 
    No conflict should be reported."""
628
 
    def runTest(self):
629
 
        return  # NOT RUN
630
 
        k = Weave()
631
 
 
632
 
        k.add_lines([], ['aaa', 'bbb'])
633
 
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
634
 
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
635
 
 
636
 
 
637
 
class Khayyam(TestBase):
638
 
    """Test changes to multi-line texts, and read/write"""
639
 
 
640
 
    def test_multi_line_merge(self):
641
 
        rawtexts = [
642
 
            """A Book of Verses underneath the Bough,
643
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
644
 
            Beside me singing in the Wilderness --
645
 
            Oh, Wilderness were Paradise enow!""",
646
 
            
647
 
            """A Book of Verses underneath the Bough,
648
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
649
 
            Beside me singing in the Wilderness --
650
 
            Oh, Wilderness were Paradise now!""",
651
 
 
652
 
            """A Book of poems underneath the tree,
653
 
            A Jug of Wine, a Loaf of Bread,
654
 
            and Thou
655
 
            Beside me singing in the Wilderness --
656
 
            Oh, Wilderness were Paradise now!
657
 
 
658
 
            -- O. Khayyam""",
659
 
 
660
 
            """A Book of Verses underneath the Bough,
661
 
            A Jug of Wine, a Loaf of Bread,
662
 
            and Thou
663
 
            Beside me singing in the Wilderness --
664
 
            Oh, Wilderness were Paradise now!""",
665
 
            ]
666
 
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
667
 
 
668
 
        k = Weave()
669
 
        parents = set()
670
 
        i = 0
671
 
        for t in texts:
672
 
            ver = k.add_lines('text%d' % i,
673
 
                        list(parents), t)
674
 
            parents.add('text%d' % i)
675
 
            i += 1
676
 
 
677
 
        self.log("k._weave=" + pformat(k._weave))
678
 
 
679
 
        for i, t in enumerate(texts):
680
 
            self.assertEqual(k.get_lines(i), t)
681
 
 
682
 
        self.check_read_write(k)
683
 
 
684
 
 
685
 
class MergeCases(TestBase):
686
 
    def doMerge(self, base, a, b, mp):
687
 
        from cStringIO import StringIO
688
 
        from textwrap import dedent
689
 
 
690
 
        def addcrlf(x):
691
 
            return x + '\n'
692
 
        
693
 
        w = Weave()
694
 
        w.add_lines('text0', [], map(addcrlf, base))
695
 
        w.add_lines('text1', ['text0'], map(addcrlf, a))
696
 
        w.add_lines('text2', ['text0'], map(addcrlf, b))
697
 
 
698
 
        self.log('weave is:')
699
 
        tmpf = StringIO()
700
 
        write_weave(w, tmpf)
701
 
        self.log(tmpf.getvalue())
702
 
 
703
 
        self.log('merge plan:')
704
 
        p = list(w.plan_merge('text1', 'text2'))
705
 
        for state, line in p:
706
 
            if line:
707
 
                self.log('%12s | %s' % (state, line[:-1]))
708
 
 
709
 
        self.log('merge:')
710
 
        mt = StringIO()
711
 
        mt.writelines(w.weave_merge(p))
712
 
        mt.seek(0)
713
 
        self.log(mt.getvalue())
714
 
 
715
 
        mp = map(addcrlf, mp)
716
 
        self.assertEqual(mt.readlines(), mp)
717
 
        
718
 
        
719
 
    def testOneInsert(self):
720
 
        self.doMerge([],
721
 
                     ['aa'],
722
 
                     [],
723
 
                     ['aa'])
724
 
 
725
 
    def testSeparateInserts(self):
726
 
        self.doMerge(['aaa', 'bbb', 'ccc'],
727
 
                     ['aaa', 'xxx', 'bbb', 'ccc'],
728
 
                     ['aaa', 'bbb', 'yyy', 'ccc'],
729
 
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
730
 
 
731
 
    def testSameInsert(self):
732
 
        self.doMerge(['aaa', 'bbb', 'ccc'],
733
 
                     ['aaa', 'xxx', 'bbb', 'ccc'],
734
 
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
735
 
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
736
 
 
737
 
    def testOverlappedInsert(self):
738
 
        self.doMerge(['aaa', 'bbb'],
739
 
                     ['aaa', 'xxx', 'yyy', 'bbb'],
740
 
                     ['aaa', 'xxx', 'bbb'],
741
 
                     ['aaa', '<<<<<<< ', 'xxx', 'yyy', '=======', 'xxx', 
742
 
                      '>>>>>>> ', 'bbb'])
743
 
 
744
 
        # really it ought to reduce this to 
745
 
        # ['aaa', 'xxx', 'yyy', 'bbb']
746
 
 
747
 
 
748
 
    def testClashReplace(self):
749
 
        self.doMerge(['aaa'],
750
 
                     ['xxx'],
751
 
                     ['yyy', 'zzz'],
752
 
                     ['<<<<<<< ', 'xxx', '=======', 'yyy', 'zzz', 
753
 
                      '>>>>>>> '])
754
 
 
755
 
    def testNonClashInsert(self):
756
 
        self.doMerge(['aaa'],
757
 
                     ['xxx', 'aaa'],
758
 
                     ['yyy', 'zzz'],
759
 
                     ['<<<<<<< ', 'xxx', 'aaa', '=======', 'yyy', 'zzz', 
760
 
                      '>>>>>>> '])
761
 
 
762
 
        self.doMerge(['aaa'],
763
 
                     ['aaa'],
764
 
                     ['yyy', 'zzz'],
765
 
                     ['yyy', 'zzz'])
766
 
 
767
 
 
768
 
    def testDeleteAndModify(self):
769
 
        """Clashing delete and modification.
770
 
 
771
 
        If one side modifies a region and the other deletes it then
772
 
        there should be a conflict with one side blank.
773
 
        """
774
 
 
775
 
        #######################################
776
 
        # skippd, not working yet
777
 
        return
778
 
        
779
 
        self.doMerge(['aaa', 'bbb', 'ccc'],
780
 
                     ['aaa', 'ddd', 'ccc'],
781
 
                     ['aaa', 'ccc'],
782
 
                     ['<<<<<<<< ', 'aaa', '=======', '>>>>>>> ', 'ccc'])
783
 
 
784
 
 
785
 
class JoinWeavesTests(TestBase):
786
 
    def setUp(self):
787
 
        super(JoinWeavesTests, self).setUp()
788
 
        self.weave1 = Weave()
789
 
        self.lines1 = ['hello\n']
790
 
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
791
 
        self.weave1.add_lines('v1', [], self.lines1)
792
 
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
793
 
        self.weave1.add_lines('v3', ['v2'], self.lines3)
794
 
        
795
 
    def test_join_empty(self):
796
 
        """Join two empty weaves."""
797
 
        eq = self.assertEqual
798
 
        w1 = Weave()
799
 
        w2 = Weave()
800
 
        w1.join(w2)
801
 
        eq(len(w1), 0)
802
 
        
803
 
    def test_join_empty_to_nonempty(self):
804
 
        """Join empty weave onto nonempty."""
805
 
        self.weave1.join(Weave())
806
 
        self.assertEqual(len(self.weave1), 3)
807
 
 
808
 
    def test_join_unrelated(self):
809
 
        """Join two weaves with no history in common."""
810
 
        wb = Weave()
811
 
        wb.add_lines('b1', [], ['line from b\n'])
812
 
        w1 = self.weave1
813
 
        w1.join(wb)
814
 
        eq = self.assertEqual
815
 
        eq(len(w1), 4)
816
 
        eq(sorted(w1.versions()),
817
 
           ['b1', 'v1', 'v2', 'v3'])
818
 
 
819
 
    def test_join_related(self):
820
 
        wa = self.weave1.copy()
821
 
        wb = self.weave1.copy()
822
 
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
823
 
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
824
 
        eq = self.assertEquals
825
 
        eq(len(wa), 4)
826
 
        eq(len(wb), 4)
827
 
        wa.join(wb)
828
 
        eq(len(wa), 5)
829
 
        eq(wa.get_lines('b1'),
830
 
           ['hello\n', 'pale blue\n', 'world\n'])
831
 
 
832
 
    def test_join_parent_disagreement(self):
833
 
        #join reconciles differening parents into a union.
834
 
        wa = Weave()
835
 
        wb = Weave()
836
 
        wa.add_lines('v1', [], ['hello\n'])
837
 
        wb.add_lines('v0', [], [])
838
 
        wb.add_lines('v1', ['v0'], ['hello\n'])
839
 
        wa.join(wb)
840
 
        self.assertEqual(['v0'], wa.get_parents('v1'))
841
 
 
842
 
    def test_join_text_disagreement(self):
843
 
        """Cannot join weaves with different texts for a version."""
844
 
        wa = Weave()
845
 
        wb = Weave()
846
 
        wa.add_lines('v1', [], ['hello\n'])
847
 
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
848
 
        self.assertRaises(WeaveError,
849
 
                          wa.join, wb)
850
 
 
851
 
    def test_join_unordered(self):
852
 
        """Join weaves where indexes differ.
853
 
        
854
 
        The source weave contains a different version at index 0."""
855
 
        wa = self.weave1.copy()
856
 
        wb = Weave()
857
 
        wb.add_lines('x1', [], ['line from x1\n'])
858
 
        wb.add_lines('v1', [], ['hello\n'])
859
 
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
860
 
        wa.join(wb)
861
 
        eq = self.assertEquals
862
 
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
863
 
        eq(wa.get_text('x1'), 'line from x1\n')
864
 
 
865
 
    def test_written_detection(self):
866
 
        # Test detection of weave file corruption.
867
 
        #
868
 
        # Make sure that we can detect if a weave file has
869
 
        # been corrupted. This doesn't test all forms of corruption,
870
 
        # but it at least helps verify the data you get, is what you want.
871
 
        from cStringIO import StringIO
872
 
 
873
 
        w = Weave()
874
 
        w.add_lines('v1', [], ['hello\n'])
875
 
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
876
 
 
877
 
        tmpf = StringIO()
878
 
        write_weave(w, tmpf)
879
 
 
880
 
        # Because we are corrupting, we need to make sure we have the exact text
881
 
        self.assertEquals('# bzr weave file v5\n'
882
 
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
883
 
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
884
 
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
885
 
                          tmpf.getvalue())
886
 
 
887
 
        # Change a single letter
888
 
        tmpf = StringIO('# bzr weave file v5\n'
889
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
890
 
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
891
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
892
 
 
893
 
        w = read_weave(tmpf)
894
 
 
895
 
        self.assertEqual('hello\n', w.get_text('v1'))
896
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
897
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
898
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
899
 
 
900
 
        # Change the sha checksum
901
 
        tmpf = StringIO('# bzr weave file v5\n'
902
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
903
 
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
904
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
905
 
 
906
 
        w = read_weave(tmpf)
907
 
 
908
 
        self.assertEqual('hello\n', w.get_text('v1'))
909
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
910
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
911
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
912
 
 
913
 
 
914
 
class InstrumentedWeave(Weave):
915
 
    """Keep track of how many times functions are called."""
916
 
    
917
 
    def __init__(self, weave_name=None):
918
 
        self._extract_count = 0
919
 
        Weave.__init__(self, weave_name=weave_name)
920
 
 
921
 
    def _extract(self, versions):
922
 
        self._extract_count += 1
923
 
        return Weave._extract(self, versions)
924
 
 
925
 
 
926
 
class JoinOptimization(TestCase):
927
 
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
928
 
 
929
 
    def test_join(self):
930
 
        w1 = InstrumentedWeave()
931
 
        w2 = InstrumentedWeave()
932
 
 
933
 
        txt0 = ['a\n']
934
 
        txt1 = ['a\n', 'b\n']
935
 
        txt2 = ['a\n', 'c\n']
936
 
        txt3 = ['a\n', 'b\n', 'c\n']
937
 
 
938
 
        w1.add_lines('txt0', [], txt0) # extract 1a
939
 
        w2.add_lines('txt0', [], txt0) # extract 1b
940
 
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
941
 
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
942
 
        w1.join(w2) # extract 3a to add txt2 
943
 
        w2.join(w1) # extract 3b to add txt1 
944
 
 
945
 
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
946
 
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
947
 
        # These secretly have inverted parents
948
 
 
949
 
        # This should not have to do any extractions
950
 
        w1.join(w2) # NO extract, texts already present with same parents
951
 
        w2.join(w1) # NO extract, texts already present with same parents
952
 
 
953
 
        self.assertEqual(4, w1._extract_count)
954
 
        self.assertEqual(4, w2._extract_count)
955
 
 
956
 
    def test_double_parent(self):
957
 
        # It should not be considered illegal to add
958
 
        # a revision with the same parent twice
959
 
        w1 = InstrumentedWeave()
960
 
        w2 = InstrumentedWeave()
961
 
 
962
 
        txt0 = ['a\n']
963
 
        txt1 = ['a\n', 'b\n']
964
 
        txt2 = ['a\n', 'c\n']
965
 
        txt3 = ['a\n', 'b\n', 'c\n']
966
 
 
967
 
        w1.add_lines('txt0', [], txt0)
968
 
        w2.add_lines('txt0', [], txt0)
969
 
        w1.add_lines('txt1', ['txt0'], txt1)
970
 
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
971
 
        # Same text, effectively the same, because the
972
 
        # parent is only repeated
973
 
        w1.join(w2) # extract 3a to add txt2 
974
 
        w2.join(w1) # extract 3b to add txt1 
975
 
 
976
 
 
977
 
class TestNeedsRweave(TestCase):
978
 
    """Internal corner cases for when reweave is needed."""
979
 
 
980
 
    def test_compatible_parents(self):
981
 
        w1 = Weave('a')
982
 
        my_parents = set([1, 2, 3])
983
 
        # subsets are ok
984
 
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
985
 
        # same sets
986
 
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
987
 
        # same empty corner case
988
 
        self.assertTrue(w1._compatible_parents(set(), set()))
989
 
        # other cannot contain stuff my_parents does not
990
 
        self.assertFalse(w1._compatible_parents(set(), set([1])))
991
 
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
992
 
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
993
 
        
994