~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Martin Pool
  • Date: 2005-07-04 13:08:55 UTC
  • Revision ID: mbp@sourcefrog.net-20050704130854-7fbef398256b8cab
- start code for built-in diff3-style resolve

- test cases for this

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
16
 
 
17
 
 
18
 
# TODO: tests regarding version names
19
 
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
20
 
#       if it fails.
21
 
 
22
 
"""test suite for weave algorithm"""
23
 
 
24
 
from pprint import pformat
25
 
 
26
 
from bzrlib import (
27
 
    errors,
28
 
    )
29
 
from bzrlib.osutils import sha_string
30
 
from bzrlib.tests import TestCase, TestCaseInTempDir
31
 
from bzrlib.weave import Weave, WeaveFormatError, WeaveError
32
 
from bzrlib.weavefile import write_weave, read_weave
33
 
 
34
 
 
35
 
# texts for use in testing
36
 
TEXT_0 = ["Hello world"]
37
 
TEXT_1 = ["Hello world",
38
 
          "A second line"]
39
 
 
40
 
 
41
 
class TestBase(TestCase):
42
 
 
43
 
    def check_read_write(self, k):
44
 
        """Check the weave k can be written & re-read."""
45
 
        from tempfile import TemporaryFile
46
 
        tf = TemporaryFile()
47
 
 
48
 
        write_weave(k, tf)
49
 
        tf.seek(0)
50
 
        k2 = read_weave(tf)
51
 
 
52
 
        if k != k2:
53
 
            tf.seek(0)
54
 
            self.log('serialized weave:')
55
 
            self.log(tf.read())
56
 
 
57
 
            self.log('')
58
 
            self.log('parents: %s' % (k._parents == k2._parents))
59
 
            self.log('         %r' % k._parents)
60
 
            self.log('         %r' % k2._parents)
61
 
            self.log('')
62
 
            self.fail('read/write check failed')
63
 
 
64
 
 
65
 
class WeaveContains(TestBase):
66
 
    """Weave __contains__ operator"""
67
 
    def runTest(self):
68
 
        k = Weave()
69
 
        self.assertFalse('foo' in k)
70
 
        k.add_lines('foo', [], TEXT_1)
71
 
        self.assertTrue('foo' in k)
72
 
 
73
 
 
74
 
class Easy(TestBase):
75
 
    def runTest(self):
76
 
        k = Weave()
77
 
 
78
 
 
79
 
class AnnotateOne(TestBase):
80
 
    def runTest(self):
81
 
        k = Weave()
82
 
        k.add_lines('text0', [], TEXT_0)
83
 
        self.assertEqual(k.annotate('text0'),
84
 
                         [('text0', TEXT_0[0])])
85
 
 
86
 
 
87
 
class GetSha1(TestBase):
88
 
    def test_get_sha1(self):
89
 
        k = Weave()
90
 
        k.add_lines('text0', [], 'text0')
91
 
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
92
 
                         k.get_sha1('text0'))
93
 
        self.assertRaises(errors.RevisionNotPresent,
94
 
                          k.get_sha1, 0)
95
 
        self.assertRaises(errors.RevisionNotPresent,
96
 
                          k.get_sha1, 'text1')
97
 
                        
98
 
 
99
 
class InvalidAdd(TestBase):
100
 
    """Try to use invalid version number during add."""
101
 
    def runTest(self):
102
 
        k = Weave()
103
 
 
104
 
        self.assertRaises(errors.RevisionNotPresent,
105
 
                          k.add_lines,
106
 
                          'text0',
107
 
                          ['69'],
108
 
                          ['new text!'])
109
 
 
110
 
 
111
 
class RepeatedAdd(TestBase):
112
 
    """Add the same version twice; harmless."""
113
 
 
114
 
    def test_duplicate_add(self):
115
 
        k = Weave()
116
 
        idx = k.add_lines('text0', [], TEXT_0)
117
 
        idx2 = k.add_lines('text0', [], TEXT_0)
118
 
        self.assertEqual(idx, idx2)
119
 
 
120
 
 
121
 
class InvalidRepeatedAdd(TestBase):
122
 
    def runTest(self):
123
 
        k = Weave()
124
 
        k.add_lines('basis', [], TEXT_0)
125
 
        idx = k.add_lines('text0', [], TEXT_0)
126
 
        self.assertRaises(errors.RevisionAlreadyPresent,
127
 
                          k.add_lines,
128
 
                          'text0',
129
 
                          [],
130
 
                          ['not the same text'])
131
 
        self.assertRaises(errors.RevisionAlreadyPresent,
132
 
                          k.add_lines,
133
 
                          'text0',
134
 
                          ['basis'],         # not the right parents
135
 
                          TEXT_0)
136
 
        
137
 
 
138
 
class InsertLines(TestBase):
139
 
    """Store a revision that adds one line to the original.
140
 
 
141
 
    Look at the annotations to make sure that the first line is matched
142
 
    and not stored repeatedly."""
143
 
    def runTest(self):
144
 
        k = Weave()
145
 
 
146
 
        k.add_lines('text0', [], ['line 1'])
147
 
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
148
 
 
149
 
        self.assertEqual(k.annotate('text0'),
150
 
                         [('text0', 'line 1')])
151
 
 
152
 
        self.assertEqual(k.get_lines(1),
153
 
                         ['line 1',
154
 
                          'line 2'])
155
 
 
156
 
        self.assertEqual(k.annotate('text1'),
157
 
                         [('text0', 'line 1'),
158
 
                          ('text1', 'line 2')])
159
 
 
160
 
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
161
 
 
162
 
        self.assertEqual(k.annotate('text2'),
163
 
                         [('text0', 'line 1'),
164
 
                          ('text2', 'diverged line')])
165
 
 
166
 
        text3 = ['line 1', 'middle line', 'line 2']
167
 
        k.add_lines('text3',
168
 
              ['text0', 'text1'],
169
 
              text3)
170
 
 
171
 
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
172
 
 
173
 
        self.log("k._weave=" + pformat(k._weave))
174
 
 
175
 
        self.assertEqual(k.annotate('text3'),
176
 
                         [('text0', 'line 1'),
177
 
                          ('text3', 'middle line'),
178
 
                          ('text1', 'line 2')])
179
 
 
180
 
        # now multiple insertions at different places
181
 
        k.add_lines('text4',
182
 
              ['text0', 'text1', 'text3'],
183
 
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
184
 
 
185
 
        self.assertEqual(k.annotate('text4'), 
186
 
                         [('text0', 'line 1'),
187
 
                          ('text4', 'aaa'),
188
 
                          ('text3', 'middle line'),
189
 
                          ('text4', 'bbb'),
190
 
                          ('text1', 'line 2'),
191
 
                          ('text4', 'ccc')])
192
 
 
193
 
 
194
 
class DeleteLines(TestBase):
195
 
    """Deletion of lines from existing text.
196
 
 
197
 
    Try various texts all based on a common ancestor."""
198
 
    def runTest(self):
199
 
        k = Weave()
200
 
 
201
 
        base_text = ['one', 'two', 'three', 'four']
202
 
 
203
 
        k.add_lines('text0', [], base_text)
204
 
        
205
 
        texts = [['one', 'two', 'three'],
206
 
                 ['two', 'three', 'four'],
207
 
                 ['one', 'four'],
208
 
                 ['one', 'two', 'three', 'four'],
209
 
                 ]
210
 
 
211
 
        i = 1
212
 
        for t in texts:
213
 
            ver = k.add_lines('text%d' % i,
214
 
                        ['text0'], t)
215
 
            i += 1
216
 
 
217
 
        self.log('final weave:')
218
 
        self.log('k._weave=' + pformat(k._weave))
219
 
 
220
 
        for i in range(len(texts)):
221
 
            self.assertEqual(k.get_lines(i+1),
222
 
                             texts[i])
223
 
 
224
 
 
225
 
class SuicideDelete(TestBase):
226
 
    """Invalid weave which tries to add and delete simultaneously."""
227
 
    def runTest(self):
228
 
        k = Weave()
229
 
 
230
 
        k._parents = [(),
231
 
                ]
232
 
        k._weave = [('{', 0),
233
 
                'first line',
234
 
                ('[', 0),
235
 
                'deleted in 0',
236
 
                (']', 0),
237
 
                ('}', 0),
238
 
                ]
239
 
        ################################### SKIPPED
240
 
        # Weave.get doesn't trap this anymore
241
 
        return 
242
 
 
243
 
        self.assertRaises(WeaveFormatError,
244
 
                          k.get_lines,
245
 
                          0)        
246
 
 
247
 
 
248
 
class CannedDelete(TestBase):
249
 
    """Unpack canned weave with deleted lines."""
250
 
    def runTest(self):
251
 
        k = Weave()
252
 
 
253
 
        k._parents = [(),
254
 
                frozenset([0]),
255
 
                ]
256
 
        k._weave = [('{', 0),
257
 
                'first line',
258
 
                ('[', 1),
259
 
                'line to be deleted',
260
 
                (']', 1),
261
 
                'last line',
262
 
                ('}', 0),
263
 
                ]
264
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
265
 
                  , sha_string('first linelast line')]
266
 
 
267
 
        self.assertEqual(k.get_lines(0),
268
 
                         ['first line',
269
 
                          'line to be deleted',
270
 
                          'last line',
271
 
                          ])
272
 
 
273
 
        self.assertEqual(k.get_lines(1),
274
 
                         ['first line',
275
 
                          'last line',
276
 
                          ])
277
 
 
278
 
 
279
 
class CannedReplacement(TestBase):
280
 
    """Unpack canned weave with deleted lines."""
281
 
    def runTest(self):
282
 
        k = Weave()
283
 
 
284
 
        k._parents = [frozenset(),
285
 
                frozenset([0]),
286
 
                ]
287
 
        k._weave = [('{', 0),
288
 
                'first line',
289
 
                ('[', 1),
290
 
                'line to be deleted',
291
 
                (']', 1),
292
 
                ('{', 1),
293
 
                'replacement line',                
294
 
                ('}', 1),
295
 
                'last line',
296
 
                ('}', 0),
297
 
                ]
298
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
299
 
                  , sha_string('first linereplacement linelast line')]
300
 
 
301
 
        self.assertEqual(k.get_lines(0),
302
 
                         ['first line',
303
 
                          'line to be deleted',
304
 
                          'last line',
305
 
                          ])
306
 
 
307
 
        self.assertEqual(k.get_lines(1),
308
 
                         ['first line',
309
 
                          'replacement line',
310
 
                          'last line',
311
 
                          ])
312
 
 
313
 
 
314
 
class BadWeave(TestBase):
315
 
    """Test that we trap an insert which should not occur."""
316
 
    def runTest(self):
317
 
        k = Weave()
318
 
 
319
 
        k._parents = [frozenset(),
320
 
                ]
321
 
        k._weave = ['bad line',
322
 
                ('{', 0),
323
 
                'foo {',
324
 
                ('{', 1),
325
 
                '  added in version 1',
326
 
                ('{', 2),
327
 
                '  added in v2',
328
 
                ('}', 2),
329
 
                '  also from v1',
330
 
                ('}', 1),
331
 
                '}',
332
 
                ('}', 0)]
333
 
 
334
 
        ################################### SKIPPED
335
 
        # Weave.get doesn't trap this anymore
336
 
        return 
337
 
 
338
 
 
339
 
        self.assertRaises(WeaveFormatError,
340
 
                          k.get,
341
 
                          0)
342
 
 
343
 
 
344
 
class BadInsert(TestBase):
345
 
    """Test that we trap an insert which should not occur."""
346
 
    def runTest(self):
347
 
        k = Weave()
348
 
 
349
 
        k._parents = [frozenset(),
350
 
                frozenset([0]),
351
 
                frozenset([0]),
352
 
                frozenset([0,1,2]),
353
 
                ]
354
 
        k._weave = [('{', 0),
355
 
                'foo {',
356
 
                ('{', 1),
357
 
                '  added in version 1',
358
 
                ('{', 1),
359
 
                '  more in 1',
360
 
                ('}', 1),
361
 
                ('}', 1),
362
 
                ('}', 0)]
363
 
 
364
 
 
365
 
        # this is not currently enforced by get
366
 
        return  ##########################################
367
 
 
368
 
        self.assertRaises(WeaveFormatError,
369
 
                          k.get,
370
 
                          0)
371
 
 
372
 
        self.assertRaises(WeaveFormatError,
373
 
                          k.get,
374
 
                          1)
375
 
 
376
 
 
377
 
class InsertNested(TestBase):
378
 
    """Insertion with nested instructions."""
379
 
    def runTest(self):
380
 
        k = Weave()
381
 
 
382
 
        k._parents = [frozenset(),
383
 
                frozenset([0]),
384
 
                frozenset([0]),
385
 
                frozenset([0,1,2]),
386
 
                ]
387
 
        k._weave = [('{', 0),
388
 
                'foo {',
389
 
                ('{', 1),
390
 
                '  added in version 1',
391
 
                ('{', 2),
392
 
                '  added in v2',
393
 
                ('}', 2),
394
 
                '  also from v1',
395
 
                ('}', 1),
396
 
                '}',
397
 
                ('}', 0)]
398
 
 
399
 
        k._sha1s = [sha_string('foo {}')
400
 
                  , sha_string('foo {  added in version 1  also from v1}')
401
 
                  , sha_string('foo {  added in v2}')
402
 
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
403
 
                  ]
404
 
 
405
 
        self.assertEqual(k.get_lines(0),
406
 
                         ['foo {',
407
 
                          '}'])
408
 
 
409
 
        self.assertEqual(k.get_lines(1),
410
 
                         ['foo {',
411
 
                          '  added in version 1',
412
 
                          '  also from v1',
413
 
                          '}'])
414
 
                       
415
 
        self.assertEqual(k.get_lines(2),
416
 
                         ['foo {',
417
 
                          '  added in v2',
418
 
                          '}'])
419
 
 
420
 
        self.assertEqual(k.get_lines(3),
421
 
                         ['foo {',
422
 
                          '  added in version 1',
423
 
                          '  added in v2',
424
 
                          '  also from v1',
425
 
                          '}'])
426
 
                         
427
 
 
428
 
class DeleteLines2(TestBase):
429
 
    """Test recording revisions that delete lines.
430
 
 
431
 
    This relies on the weave having a way to represent lines knocked
432
 
    out by a later revision."""
433
 
    def runTest(self):
434
 
        k = Weave()
435
 
 
436
 
        k.add_lines('text0', [], ["line the first",
437
 
                   "line 2",
438
 
                   "line 3",
439
 
                   "fine"])
440
 
 
441
 
        self.assertEqual(len(k.get_lines(0)), 4)
442
 
 
443
 
        k.add_lines('text1', ['text0'], ["line the first",
444
 
                   "fine"])
445
 
 
446
 
        self.assertEqual(k.get_lines(1),
447
 
                         ["line the first",
448
 
                          "fine"])
449
 
 
450
 
        self.assertEqual(k.annotate('text1'),
451
 
                         [('text0', "line the first"),
452
 
                          ('text0', "fine")])
453
 
 
454
 
 
455
 
class IncludeVersions(TestBase):
456
 
    """Check texts that are stored across multiple revisions.
457
 
 
458
 
    Here we manually create a weave with particular encoding and make
459
 
    sure it unpacks properly.
460
 
 
461
 
    Text 0 includes nothing; text 1 includes text 0 and adds some
462
 
    lines.
463
 
    """
464
 
 
465
 
    def runTest(self):
466
 
        k = Weave()
467
 
 
468
 
        k._parents = [frozenset(), frozenset([0])]
469
 
        k._weave = [('{', 0),
470
 
                "first line",
471
 
                ('}', 0),
472
 
                ('{', 1),
473
 
                "second line",
474
 
                ('}', 1)]
475
 
 
476
 
        k._sha1s = [sha_string('first line')
477
 
                  , sha_string('first linesecond line')]
478
 
 
479
 
        self.assertEqual(k.get_lines(1),
480
 
                         ["first line",
481
 
                          "second line"])
482
 
 
483
 
        self.assertEqual(k.get_lines(0),
484
 
                         ["first line"])
485
 
 
486
 
 
487
 
class DivergedIncludes(TestBase):
488
 
    """Weave with two diverged texts based on version 0.
489
 
    """
490
 
    def runTest(self):
491
 
        # FIXME make the weave, dont poke at it.
492
 
        k = Weave()
493
 
 
494
 
        k._names = ['0', '1', '2']
495
 
        k._name_map = {'0':0, '1':1, '2':2}
496
 
        k._parents = [frozenset(),
497
 
                frozenset([0]),
498
 
                frozenset([0]),
499
 
                ]
500
 
        k._weave = [('{', 0),
501
 
                "first line",
502
 
                ('}', 0),
503
 
                ('{', 1),
504
 
                "second line",
505
 
                ('}', 1),
506
 
                ('{', 2),
507
 
                "alternative second line",
508
 
                ('}', 2),                
509
 
                ]
510
 
 
511
 
        k._sha1s = [sha_string('first line')
512
 
                  , sha_string('first linesecond line')
513
 
                  , sha_string('first linealternative second line')]
514
 
 
515
 
        self.assertEqual(k.get_lines(0),
516
 
                         ["first line"])
517
 
 
518
 
        self.assertEqual(k.get_lines(1),
519
 
                         ["first line",
520
 
                          "second line"])
521
 
 
522
 
        self.assertEqual(k.get_lines('2'),
523
 
                         ["first line",
524
 
                          "alternative second line"])
525
 
 
526
 
        self.assertEqual(list(k.get_ancestry(['2'])),
527
 
                         ['0', '2'])
528
 
 
529
 
 
530
 
class ReplaceLine(TestBase):
531
 
    def runTest(self):
532
 
        k = Weave()
533
 
 
534
 
        text0 = ['cheddar', 'stilton', 'gruyere']
535
 
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
536
 
        
537
 
        k.add_lines('text0', [], text0)
538
 
        k.add_lines('text1', ['text0'], text1)
539
 
 
540
 
        self.log('k._weave=' + pformat(k._weave))
541
 
 
542
 
        self.assertEqual(k.get_lines(0), text0)
543
 
        self.assertEqual(k.get_lines(1), text1)
544
 
 
545
 
 
546
 
class Merge(TestBase):
547
 
    """Storage of versions that merge diverged parents"""
548
 
    def runTest(self):
549
 
        k = Weave()
550
 
 
551
 
        texts = [['header'],
552
 
                 ['header', '', 'line from 1'],
553
 
                 ['header', '', 'line from 2', 'more from 2'],
554
 
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
555
 
                 ]
556
 
 
557
 
        k.add_lines('text0', [], texts[0])
558
 
        k.add_lines('text1', ['text0'], texts[1])
559
 
        k.add_lines('text2', ['text0'], texts[2])
560
 
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
561
 
 
562
 
        for i, t in enumerate(texts):
563
 
            self.assertEqual(k.get_lines(i), t)
564
 
 
565
 
        self.assertEqual(k.annotate('merge'),
566
 
                         [('text0', 'header'),
567
 
                          ('text1', ''),
568
 
                          ('text1', 'line from 1'),
569
 
                          ('merge', 'fixup line'),
570
 
                          ('text2', 'line from 2'),
571
 
                          ])
572
 
 
573
 
        self.assertEqual(list(k.get_ancestry(['merge'])),
574
 
                         ['text0', 'text1', 'text2', 'merge'])
575
 
 
576
 
        self.log('k._weave=' + pformat(k._weave))
577
 
 
578
 
        self.check_read_write(k)
579
 
 
580
 
 
581
 
class Conflicts(TestBase):
582
 
    """Test detection of conflicting regions during a merge.
583
 
 
584
 
    A base version is inserted, then two descendents try to
585
 
    insert different lines in the same place.  These should be
586
 
    reported as a possible conflict and forwarded to the user."""
587
 
    def runTest(self):
588
 
        return  # NOT RUN
589
 
        k = Weave()
590
 
 
591
 
        k.add_lines([], ['aaa', 'bbb'])
592
 
        k.add_lines([0], ['aaa', '111', 'bbb'])
593
 
        k.add_lines([1], ['aaa', '222', 'bbb'])
594
 
 
595
 
        merged = k.merge([1, 2])
596
 
 
597
 
        self.assertEquals([[['aaa']],
598
 
                           [['111'], ['222']],
599
 
                           [['bbb']]])
600
 
 
601
 
 
602
 
class NonConflict(TestBase):
603
 
    """Two descendants insert compatible changes.
604
 
 
605
 
    No conflict should be reported."""
606
 
    def runTest(self):
607
 
        return  # NOT RUN
608
 
        k = Weave()
609
 
 
610
 
        k.add_lines([], ['aaa', 'bbb'])
611
 
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
612
 
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
613
 
 
614
 
 
615
 
class Khayyam(TestBase):
616
 
    """Test changes to multi-line texts, and read/write"""
617
 
 
618
 
    def test_multi_line_merge(self):
619
 
        rawtexts = [
620
 
            """A Book of Verses underneath the Bough,
621
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
622
 
            Beside me singing in the Wilderness --
623
 
            Oh, Wilderness were Paradise enow!""",
624
 
            
625
 
            """A Book of Verses underneath the Bough,
626
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
627
 
            Beside me singing in the Wilderness --
628
 
            Oh, Wilderness were Paradise now!""",
629
 
 
630
 
            """A Book of poems underneath the tree,
631
 
            A Jug of Wine, a Loaf of Bread,
632
 
            and Thou
633
 
            Beside me singing in the Wilderness --
634
 
            Oh, Wilderness were Paradise now!
635
 
 
636
 
            -- O. Khayyam""",
637
 
 
638
 
            """A Book of Verses underneath the Bough,
639
 
            A Jug of Wine, a Loaf of Bread,
640
 
            and Thou
641
 
            Beside me singing in the Wilderness --
642
 
            Oh, Wilderness were Paradise now!""",
643
 
            ]
644
 
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
645
 
 
646
 
        k = Weave()
647
 
        parents = set()
648
 
        i = 0
649
 
        for t in texts:
650
 
            ver = k.add_lines('text%d' % i,
651
 
                        list(parents), t)
652
 
            parents.add('text%d' % i)
653
 
            i += 1
654
 
 
655
 
        self.log("k._weave=" + pformat(k._weave))
656
 
 
657
 
        for i, t in enumerate(texts):
658
 
            self.assertEqual(k.get_lines(i), t)
659
 
 
660
 
        self.check_read_write(k)
661
 
 
662
 
 
663
 
class JoinWeavesTests(TestBase):
664
 
    def setUp(self):
665
 
        super(JoinWeavesTests, self).setUp()
666
 
        self.weave1 = Weave()
667
 
        self.lines1 = ['hello\n']
668
 
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
669
 
        self.weave1.add_lines('v1', [], self.lines1)
670
 
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
671
 
        self.weave1.add_lines('v3', ['v2'], self.lines3)
672
 
        
673
 
    def test_join_empty(self):
674
 
        """Join two empty weaves."""
675
 
        eq = self.assertEqual
676
 
        w1 = Weave()
677
 
        w2 = Weave()
678
 
        w1.join(w2)
679
 
        eq(len(w1), 0)
680
 
        
681
 
    def test_join_empty_to_nonempty(self):
682
 
        """Join empty weave onto nonempty."""
683
 
        self.weave1.join(Weave())
684
 
        self.assertEqual(len(self.weave1), 3)
685
 
 
686
 
    def test_join_unrelated(self):
687
 
        """Join two weaves with no history in common."""
688
 
        wb = Weave()
689
 
        wb.add_lines('b1', [], ['line from b\n'])
690
 
        w1 = self.weave1
691
 
        w1.join(wb)
692
 
        eq = self.assertEqual
693
 
        eq(len(w1), 4)
694
 
        eq(sorted(w1.versions()),
695
 
           ['b1', 'v1', 'v2', 'v3'])
696
 
 
697
 
    def test_join_related(self):
698
 
        wa = self.weave1.copy()
699
 
        wb = self.weave1.copy()
700
 
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
701
 
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
702
 
        eq = self.assertEquals
703
 
        eq(len(wa), 4)
704
 
        eq(len(wb), 4)
705
 
        wa.join(wb)
706
 
        eq(len(wa), 5)
707
 
        eq(wa.get_lines('b1'),
708
 
           ['hello\n', 'pale blue\n', 'world\n'])
709
 
 
710
 
    def test_join_text_disagreement(self):
711
 
        """Cannot join weaves with different texts for a version."""
712
 
        wa = Weave()
713
 
        wb = Weave()
714
 
        wa.add_lines('v1', [], ['hello\n'])
715
 
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
716
 
        self.assertRaises(WeaveError,
717
 
                          wa.join, wb)
718
 
 
719
 
    def test_join_unordered(self):
720
 
        """Join weaves where indexes differ.
721
 
        
722
 
        The source weave contains a different version at index 0."""
723
 
        wa = self.weave1.copy()
724
 
        wb = Weave()
725
 
        wb.add_lines('x1', [], ['line from x1\n'])
726
 
        wb.add_lines('v1', [], ['hello\n'])
727
 
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
728
 
        wa.join(wb)
729
 
        eq = self.assertEquals
730
 
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
731
 
        eq(wa.get_text('x1'), 'line from x1\n')
732
 
 
733
 
    def test_written_detection(self):
734
 
        # Test detection of weave file corruption.
735
 
        #
736
 
        # Make sure that we can detect if a weave file has
737
 
        # been corrupted. This doesn't test all forms of corruption,
738
 
        # but it at least helps verify the data you get, is what you want.
739
 
        from cStringIO import StringIO
740
 
 
741
 
        w = Weave()
742
 
        w.add_lines('v1', [], ['hello\n'])
743
 
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
744
 
 
745
 
        tmpf = StringIO()
746
 
        write_weave(w, tmpf)
747
 
 
748
 
        # Because we are corrupting, we need to make sure we have the exact text
749
 
        self.assertEquals('# bzr weave file v5\n'
750
 
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
751
 
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
752
 
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
753
 
                          tmpf.getvalue())
754
 
 
755
 
        # Change a single letter
756
 
        tmpf = StringIO('# bzr weave file v5\n'
757
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
758
 
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
759
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
760
 
 
761
 
        w = read_weave(tmpf)
762
 
 
763
 
        self.assertEqual('hello\n', w.get_text('v1'))
764
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
765
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
766
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
767
 
 
768
 
        # Change the sha checksum
769
 
        tmpf = StringIO('# bzr weave file v5\n'
770
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
771
 
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
772
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
773
 
 
774
 
        w = read_weave(tmpf)
775
 
 
776
 
        self.assertEqual('hello\n', w.get_text('v1'))
777
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
778
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
779
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
780
 
 
781
 
 
782
 
class InstrumentedWeave(Weave):
783
 
    """Keep track of how many times functions are called."""
784
 
    
785
 
    def __init__(self, weave_name=None):
786
 
        self._extract_count = 0
787
 
        Weave.__init__(self, weave_name=weave_name)
788
 
 
789
 
    def _extract(self, versions):
790
 
        self._extract_count += 1
791
 
        return Weave._extract(self, versions)
792
 
 
793
 
 
794
 
class JoinOptimization(TestCase):
795
 
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
796
 
 
797
 
    def test_join(self):
798
 
        w1 = InstrumentedWeave()
799
 
        w2 = InstrumentedWeave()
800
 
 
801
 
        txt0 = ['a\n']
802
 
        txt1 = ['a\n', 'b\n']
803
 
        txt2 = ['a\n', 'c\n']
804
 
        txt3 = ['a\n', 'b\n', 'c\n']
805
 
 
806
 
        w1.add_lines('txt0', [], txt0) # extract 1a
807
 
        w2.add_lines('txt0', [], txt0) # extract 1b
808
 
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
809
 
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
810
 
        w1.join(w2) # extract 3a to add txt2 
811
 
        w2.join(w1) # extract 3b to add txt1 
812
 
 
813
 
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
814
 
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
815
 
        # These secretly have inverted parents
816
 
 
817
 
        # This should not have to do any extractions
818
 
        w1.join(w2) # NO extract, texts already present with same parents
819
 
        w2.join(w1) # NO extract, texts already present with same parents
820
 
 
821
 
        self.assertEqual(4, w1._extract_count)
822
 
        self.assertEqual(4, w2._extract_count)
823
 
 
824
 
    def test_double_parent(self):
825
 
        # It should not be considered illegal to add
826
 
        # a revision with the same parent twice
827
 
        w1 = InstrumentedWeave()
828
 
        w2 = InstrumentedWeave()
829
 
 
830
 
        txt0 = ['a\n']
831
 
        txt1 = ['a\n', 'b\n']
832
 
        txt2 = ['a\n', 'c\n']
833
 
        txt3 = ['a\n', 'b\n', 'c\n']
834
 
 
835
 
        w1.add_lines('txt0', [], txt0)
836
 
        w2.add_lines('txt0', [], txt0)
837
 
        w1.add_lines('txt1', ['txt0'], txt1)
838
 
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
839
 
        # Same text, effectively the same, because the
840
 
        # parent is only repeated
841
 
        w1.join(w2) # extract 3a to add txt2 
842
 
        w2.join(w1) # extract 3b to add txt1 
843
 
 
844
 
 
845
 
class TestNeedsReweave(TestCase):
846
 
    """Internal corner cases for when reweave is needed."""
847
 
 
848
 
    def test_compatible_parents(self):
849
 
        w1 = Weave('a')
850
 
        my_parents = set([1, 2, 3])
851
 
        # subsets are ok
852
 
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
853
 
        # same sets
854
 
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
855
 
        # same empty corner case
856
 
        self.assertTrue(w1._compatible_parents(set(), set()))
857
 
        # other cannot contain stuff my_parents does not
858
 
        self.assertFalse(w1._compatible_parents(set(), set([1])))
859
 
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
860
 
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
861
 
 
862
 
 
863
 
class TestWeaveFile(TestCaseInTempDir):
864
 
    
865
 
    def test_empty_file(self):
866
 
        f = open('empty.weave', 'wb+')
867
 
        try:
868
 
            self.assertRaises(errors.WeaveFormatError,
869
 
                              read_weave, f)
870
 
        finally:
871
 
            f.close()