~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Martin Pool
  • Date: 2005-06-21 06:10:01 UTC
  • Revision ID: mbp@sourcefrog.net-20050621061001-f9da0e19b08848b0
- cleanup

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
#! /usr/bin/python2.4
2
 
 
3
 
# Copyright (C) 2005 Canonical Ltd
4
 
#
5
 
# This program is free software; you can redistribute it and/or modify
6
 
# it under the terms of the GNU General Public License as published by
7
 
# the Free Software Foundation; either version 2 of the License, or
8
 
# (at your option) any later version.
9
 
#
10
 
# This program is distributed in the hope that it will be useful,
11
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
 
# GNU General Public License for more details.
14
 
#
15
 
# You should have received a copy of the GNU General Public License
16
 
# along with this program; if not, write to the Free Software
17
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18
 
 
19
 
 
20
 
# TODO: tests regarding version names
21
 
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
22
 
#       if it fails.
23
 
 
24
 
"""test suite for weave algorithm"""
25
 
 
26
 
from pprint import pformat
27
 
 
28
 
from bzrlib import (
29
 
    errors,
30
 
    )
31
 
from bzrlib.osutils import sha_string
32
 
from bzrlib.tests import TestCase, TestCaseInTempDir
33
 
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
34
 
from bzrlib.weavefile import write_weave, read_weave
35
 
 
36
 
 
37
 
# texts for use in testing
38
 
TEXT_0 = ["Hello world"]
39
 
TEXT_1 = ["Hello world",
40
 
          "A second line"]
41
 
 
42
 
 
43
 
class TestBase(TestCase):
44
 
    def check_read_write(self, k):
45
 
        """Check the weave k can be written & re-read."""
46
 
        from tempfile import TemporaryFile
47
 
        tf = TemporaryFile()
48
 
 
49
 
        write_weave(k, tf)
50
 
        tf.seek(0)
51
 
        k2 = read_weave(tf)
52
 
 
53
 
        if k != k2:
54
 
            tf.seek(0)
55
 
            self.log('serialized weave:')
56
 
            self.log(tf.read())
57
 
 
58
 
            self.log('')
59
 
            self.log('parents: %s' % (k._parents == k2._parents))
60
 
            self.log('         %r' % k._parents)
61
 
            self.log('         %r' % k2._parents)
62
 
            self.log('')
63
 
            self.fail('read/write check failed')
64
 
 
65
 
 
66
 
class WeaveContains(TestBase):
67
 
    """Weave __contains__ operator"""
68
 
    def runTest(self):
69
 
        k = Weave()
70
 
        self.assertFalse('foo' in k)
71
 
        k.add_lines('foo', [], TEXT_1)
72
 
        self.assertTrue('foo' in k)
73
 
 
74
 
 
75
 
class Easy(TestBase):
76
 
    def runTest(self):
77
 
        k = Weave()
78
 
 
79
 
 
80
 
class StoreText(TestBase):
81
 
    """Store and retrieve a simple text."""
82
 
 
83
 
    def test_storing_text(self):
84
 
        k = Weave()
85
 
        idx = k.add_lines('text0', [], TEXT_0)
86
 
        self.assertEqual(k.get_lines(idx), TEXT_0)
87
 
        self.assertEqual(idx, 0)
88
 
 
89
 
 
90
 
class AnnotateOne(TestBase):
91
 
    def runTest(self):
92
 
        k = Weave()
93
 
        k.add_lines('text0', [], TEXT_0)
94
 
        self.assertEqual(k.annotate('text0'),
95
 
                         [('text0', TEXT_0[0])])
96
 
 
97
 
 
98
 
class StoreTwo(TestBase):
99
 
    def runTest(self):
100
 
        k = Weave()
101
 
 
102
 
        idx = k.add_lines('text0', [], TEXT_0)
103
 
        self.assertEqual(idx, 0)
104
 
 
105
 
        idx = k.add_lines('text1', [], TEXT_1)
106
 
        self.assertEqual(idx, 1)
107
 
 
108
 
        self.assertEqual(k.get_lines(0), TEXT_0)
109
 
        self.assertEqual(k.get_lines(1), TEXT_1)
110
 
 
111
 
 
112
 
class GetSha1(TestBase):
113
 
    def test_get_sha1(self):
114
 
        k = Weave()
115
 
        k.add_lines('text0', [], 'text0')
116
 
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
117
 
                         k.get_sha1('text0'))
118
 
        self.assertRaises(errors.RevisionNotPresent,
119
 
                          k.get_sha1, 0)
120
 
        self.assertRaises(errors.RevisionNotPresent,
121
 
                          k.get_sha1, 'text1')
122
 
                        
123
 
 
124
 
class InvalidAdd(TestBase):
125
 
    """Try to use invalid version number during add."""
126
 
    def runTest(self):
127
 
        k = Weave()
128
 
 
129
 
        self.assertRaises(errors.RevisionNotPresent,
130
 
                          k.add_lines,
131
 
                          'text0',
132
 
                          ['69'],
133
 
                          ['new text!'])
134
 
 
135
 
 
136
 
class RepeatedAdd(TestBase):
137
 
    """Add the same version twice; harmless."""
138
 
    def runTest(self):
139
 
        k = Weave()
140
 
        idx = k.add_lines('text0', [], TEXT_0)
141
 
        idx2 = k.add_lines('text0', [], TEXT_0)
142
 
        self.assertEqual(idx, idx2)
143
 
 
144
 
 
145
 
class InvalidRepeatedAdd(TestBase):
146
 
    def runTest(self):
147
 
        k = Weave()
148
 
        k.add_lines('basis', [], TEXT_0)
149
 
        idx = k.add_lines('text0', [], TEXT_0)
150
 
        self.assertRaises(errors.RevisionAlreadyPresent,
151
 
                          k.add_lines,
152
 
                          'text0',
153
 
                          [],
154
 
                          ['not the same text'])
155
 
        self.assertRaises(errors.RevisionAlreadyPresent,
156
 
                          k.add_lines,
157
 
                          'text0',
158
 
                          ['basis'],         # not the right parents
159
 
                          TEXT_0)
160
 
        
161
 
 
162
 
class InsertLines(TestBase):
163
 
    """Store a revision that adds one line to the original.
164
 
 
165
 
    Look at the annotations to make sure that the first line is matched
166
 
    and not stored repeatedly."""
167
 
    def runTest(self):
168
 
        k = Weave()
169
 
 
170
 
        k.add_lines('text0', [], ['line 1'])
171
 
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
172
 
 
173
 
        self.assertEqual(k.annotate('text0'),
174
 
                         [('text0', 'line 1')])
175
 
 
176
 
        self.assertEqual(k.get_lines(1),
177
 
                         ['line 1',
178
 
                          'line 2'])
179
 
 
180
 
        self.assertEqual(k.annotate('text1'),
181
 
                         [('text0', 'line 1'),
182
 
                          ('text1', 'line 2')])
183
 
 
184
 
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
185
 
 
186
 
        self.assertEqual(k.annotate('text2'),
187
 
                         [('text0', 'line 1'),
188
 
                          ('text2', 'diverged line')])
189
 
 
190
 
        text3 = ['line 1', 'middle line', 'line 2']
191
 
        k.add_lines('text3',
192
 
              ['text0', 'text1'],
193
 
              text3)
194
 
 
195
 
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
196
 
 
197
 
        self.log("k._weave=" + pformat(k._weave))
198
 
 
199
 
        self.assertEqual(k.annotate('text3'),
200
 
                         [('text0', 'line 1'),
201
 
                          ('text3', 'middle line'),
202
 
                          ('text1', 'line 2')])
203
 
 
204
 
        # now multiple insertions at different places
205
 
        k.add_lines('text4',
206
 
              ['text0', 'text1', 'text3'],
207
 
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
208
 
 
209
 
        self.assertEqual(k.annotate('text4'), 
210
 
                         [('text0', 'line 1'),
211
 
                          ('text4', 'aaa'),
212
 
                          ('text3', 'middle line'),
213
 
                          ('text4', 'bbb'),
214
 
                          ('text1', 'line 2'),
215
 
                          ('text4', 'ccc')])
216
 
 
217
 
 
218
 
class DeleteLines(TestBase):
219
 
    """Deletion of lines from existing text.
220
 
 
221
 
    Try various texts all based on a common ancestor."""
222
 
    def runTest(self):
223
 
        k = Weave()
224
 
 
225
 
        base_text = ['one', 'two', 'three', 'four']
226
 
 
227
 
        k.add_lines('text0', [], base_text)
228
 
        
229
 
        texts = [['one', 'two', 'three'],
230
 
                 ['two', 'three', 'four'],
231
 
                 ['one', 'four'],
232
 
                 ['one', 'two', 'three', 'four'],
233
 
                 ]
234
 
 
235
 
        i = 1
236
 
        for t in texts:
237
 
            ver = k.add_lines('text%d' % i,
238
 
                        ['text0'], t)
239
 
            i += 1
240
 
 
241
 
        self.log('final weave:')
242
 
        self.log('k._weave=' + pformat(k._weave))
243
 
 
244
 
        for i in range(len(texts)):
245
 
            self.assertEqual(k.get_lines(i+1),
246
 
                             texts[i])
247
 
 
248
 
 
249
 
class SuicideDelete(TestBase):
250
 
    """Invalid weave which tries to add and delete simultaneously."""
251
 
    def runTest(self):
252
 
        k = Weave()
253
 
 
254
 
        k._parents = [(),
255
 
                ]
256
 
        k._weave = [('{', 0),
257
 
                'first line',
258
 
                ('[', 0),
259
 
                'deleted in 0',
260
 
                (']', 0),
261
 
                ('}', 0),
262
 
                ]
263
 
        ################################### SKIPPED
264
 
        # Weave.get doesn't trap this anymore
265
 
        return 
266
 
 
267
 
        self.assertRaises(WeaveFormatError,
268
 
                          k.get_lines,
269
 
                          0)        
270
 
 
271
 
 
272
 
class CannedDelete(TestBase):
273
 
    """Unpack canned weave with deleted lines."""
274
 
    def runTest(self):
275
 
        k = Weave()
276
 
 
277
 
        k._parents = [(),
278
 
                frozenset([0]),
279
 
                ]
280
 
        k._weave = [('{', 0),
281
 
                'first line',
282
 
                ('[', 1),
283
 
                'line to be deleted',
284
 
                (']', 1),
285
 
                'last line',
286
 
                ('}', 0),
287
 
                ]
288
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
289
 
                  , sha_string('first linelast line')]
290
 
 
291
 
        self.assertEqual(k.get_lines(0),
292
 
                         ['first line',
293
 
                          'line to be deleted',
294
 
                          'last line',
295
 
                          ])
296
 
 
297
 
        self.assertEqual(k.get_lines(1),
298
 
                         ['first line',
299
 
                          'last line',
300
 
                          ])
301
 
 
302
 
 
303
 
class CannedReplacement(TestBase):
304
 
    """Unpack canned weave with deleted lines."""
305
 
    def runTest(self):
306
 
        k = Weave()
307
 
 
308
 
        k._parents = [frozenset(),
309
 
                frozenset([0]),
310
 
                ]
311
 
        k._weave = [('{', 0),
312
 
                'first line',
313
 
                ('[', 1),
314
 
                'line to be deleted',
315
 
                (']', 1),
316
 
                ('{', 1),
317
 
                'replacement line',                
318
 
                ('}', 1),
319
 
                'last line',
320
 
                ('}', 0),
321
 
                ]
322
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
323
 
                  , sha_string('first linereplacement linelast line')]
324
 
 
325
 
        self.assertEqual(k.get_lines(0),
326
 
                         ['first line',
327
 
                          'line to be deleted',
328
 
                          'last line',
329
 
                          ])
330
 
 
331
 
        self.assertEqual(k.get_lines(1),
332
 
                         ['first line',
333
 
                          'replacement line',
334
 
                          'last line',
335
 
                          ])
336
 
 
337
 
 
338
 
class BadWeave(TestBase):
339
 
    """Test that we trap an insert which should not occur."""
340
 
    def runTest(self):
341
 
        k = Weave()
342
 
 
343
 
        k._parents = [frozenset(),
344
 
                ]
345
 
        k._weave = ['bad line',
346
 
                ('{', 0),
347
 
                'foo {',
348
 
                ('{', 1),
349
 
                '  added in version 1',
350
 
                ('{', 2),
351
 
                '  added in v2',
352
 
                ('}', 2),
353
 
                '  also from v1',
354
 
                ('}', 1),
355
 
                '}',
356
 
                ('}', 0)]
357
 
 
358
 
        ################################### SKIPPED
359
 
        # Weave.get doesn't trap this anymore
360
 
        return 
361
 
 
362
 
 
363
 
        self.assertRaises(WeaveFormatError,
364
 
                          k.get,
365
 
                          0)
366
 
 
367
 
 
368
 
class BadInsert(TestBase):
369
 
    """Test that we trap an insert which should not occur."""
370
 
    def runTest(self):
371
 
        k = Weave()
372
 
 
373
 
        k._parents = [frozenset(),
374
 
                frozenset([0]),
375
 
                frozenset([0]),
376
 
                frozenset([0,1,2]),
377
 
                ]
378
 
        k._weave = [('{', 0),
379
 
                'foo {',
380
 
                ('{', 1),
381
 
                '  added in version 1',
382
 
                ('{', 1),
383
 
                '  more in 1',
384
 
                ('}', 1),
385
 
                ('}', 1),
386
 
                ('}', 0)]
387
 
 
388
 
 
389
 
        # this is not currently enforced by get
390
 
        return  ##########################################
391
 
 
392
 
        self.assertRaises(WeaveFormatError,
393
 
                          k.get,
394
 
                          0)
395
 
 
396
 
        self.assertRaises(WeaveFormatError,
397
 
                          k.get,
398
 
                          1)
399
 
 
400
 
 
401
 
class InsertNested(TestBase):
402
 
    """Insertion with nested instructions."""
403
 
    def runTest(self):
404
 
        k = Weave()
405
 
 
406
 
        k._parents = [frozenset(),
407
 
                frozenset([0]),
408
 
                frozenset([0]),
409
 
                frozenset([0,1,2]),
410
 
                ]
411
 
        k._weave = [('{', 0),
412
 
                'foo {',
413
 
                ('{', 1),
414
 
                '  added in version 1',
415
 
                ('{', 2),
416
 
                '  added in v2',
417
 
                ('}', 2),
418
 
                '  also from v1',
419
 
                ('}', 1),
420
 
                '}',
421
 
                ('}', 0)]
422
 
 
423
 
        k._sha1s = [sha_string('foo {}')
424
 
                  , sha_string('foo {  added in version 1  also from v1}')
425
 
                  , sha_string('foo {  added in v2}')
426
 
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
427
 
                  ]
428
 
 
429
 
        self.assertEqual(k.get_lines(0),
430
 
                         ['foo {',
431
 
                          '}'])
432
 
 
433
 
        self.assertEqual(k.get_lines(1),
434
 
                         ['foo {',
435
 
                          '  added in version 1',
436
 
                          '  also from v1',
437
 
                          '}'])
438
 
                       
439
 
        self.assertEqual(k.get_lines(2),
440
 
                         ['foo {',
441
 
                          '  added in v2',
442
 
                          '}'])
443
 
 
444
 
        self.assertEqual(k.get_lines(3),
445
 
                         ['foo {',
446
 
                          '  added in version 1',
447
 
                          '  added in v2',
448
 
                          '  also from v1',
449
 
                          '}'])
450
 
                         
451
 
 
452
 
class DeleteLines2(TestBase):
453
 
    """Test recording revisions that delete lines.
454
 
 
455
 
    This relies on the weave having a way to represent lines knocked
456
 
    out by a later revision."""
457
 
    def runTest(self):
458
 
        k = Weave()
459
 
 
460
 
        k.add_lines('text0', [], ["line the first",
461
 
                   "line 2",
462
 
                   "line 3",
463
 
                   "fine"])
464
 
 
465
 
        self.assertEqual(len(k.get_lines(0)), 4)
466
 
 
467
 
        k.add_lines('text1', ['text0'], ["line the first",
468
 
                   "fine"])
469
 
 
470
 
        self.assertEqual(k.get_lines(1),
471
 
                         ["line the first",
472
 
                          "fine"])
473
 
 
474
 
        self.assertEqual(k.annotate('text1'),
475
 
                         [('text0', "line the first"),
476
 
                          ('text0', "fine")])
477
 
 
478
 
 
479
 
class IncludeVersions(TestBase):
480
 
    """Check texts that are stored across multiple revisions.
481
 
 
482
 
    Here we manually create a weave with particular encoding and make
483
 
    sure it unpacks properly.
484
 
 
485
 
    Text 0 includes nothing; text 1 includes text 0 and adds some
486
 
    lines.
487
 
    """
488
 
 
489
 
    def runTest(self):
490
 
        k = Weave()
491
 
 
492
 
        k._parents = [frozenset(), frozenset([0])]
493
 
        k._weave = [('{', 0),
494
 
                "first line",
495
 
                ('}', 0),
496
 
                ('{', 1),
497
 
                "second line",
498
 
                ('}', 1)]
499
 
 
500
 
        k._sha1s = [sha_string('first line')
501
 
                  , sha_string('first linesecond line')]
502
 
 
503
 
        self.assertEqual(k.get_lines(1),
504
 
                         ["first line",
505
 
                          "second line"])
506
 
 
507
 
        self.assertEqual(k.get_lines(0),
508
 
                         ["first line"])
509
 
 
510
 
 
511
 
class DivergedIncludes(TestBase):
512
 
    """Weave with two diverged texts based on version 0.
513
 
    """
514
 
    def runTest(self):
515
 
        # FIXME make the weave, dont poke at it.
516
 
        k = Weave()
517
 
 
518
 
        k._names = ['0', '1', '2']
519
 
        k._name_map = {'0':0, '1':1, '2':2}
520
 
        k._parents = [frozenset(),
521
 
                frozenset([0]),
522
 
                frozenset([0]),
523
 
                ]
524
 
        k._weave = [('{', 0),
525
 
                "first line",
526
 
                ('}', 0),
527
 
                ('{', 1),
528
 
                "second line",
529
 
                ('}', 1),
530
 
                ('{', 2),
531
 
                "alternative second line",
532
 
                ('}', 2),                
533
 
                ]
534
 
 
535
 
        k._sha1s = [sha_string('first line')
536
 
                  , sha_string('first linesecond line')
537
 
                  , sha_string('first linealternative second line')]
538
 
 
539
 
        self.assertEqual(k.get_lines(0),
540
 
                         ["first line"])
541
 
 
542
 
        self.assertEqual(k.get_lines(1),
543
 
                         ["first line",
544
 
                          "second line"])
545
 
 
546
 
        self.assertEqual(k.get_lines('2'),
547
 
                         ["first line",
548
 
                          "alternative second line"])
549
 
 
550
 
        self.assertEqual(list(k.get_ancestry(['2'])),
551
 
                         ['0', '2'])
552
 
 
553
 
 
554
 
class ReplaceLine(TestBase):
555
 
    def runTest(self):
556
 
        k = Weave()
557
 
 
558
 
        text0 = ['cheddar', 'stilton', 'gruyere']
559
 
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
560
 
        
561
 
        k.add_lines('text0', [], text0)
562
 
        k.add_lines('text1', ['text0'], text1)
563
 
 
564
 
        self.log('k._weave=' + pformat(k._weave))
565
 
 
566
 
        self.assertEqual(k.get_lines(0), text0)
567
 
        self.assertEqual(k.get_lines(1), text1)
568
 
 
569
 
 
570
 
class Merge(TestBase):
571
 
    """Storage of versions that merge diverged parents"""
572
 
    def runTest(self):
573
 
        k = Weave()
574
 
 
575
 
        texts = [['header'],
576
 
                 ['header', '', 'line from 1'],
577
 
                 ['header', '', 'line from 2', 'more from 2'],
578
 
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
579
 
                 ]
580
 
 
581
 
        k.add_lines('text0', [], texts[0])
582
 
        k.add_lines('text1', ['text0'], texts[1])
583
 
        k.add_lines('text2', ['text0'], texts[2])
584
 
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
585
 
 
586
 
        for i, t in enumerate(texts):
587
 
            self.assertEqual(k.get_lines(i), t)
588
 
 
589
 
        self.assertEqual(k.annotate('merge'),
590
 
                         [('text0', 'header'),
591
 
                          ('text1', ''),
592
 
                          ('text1', 'line from 1'),
593
 
                          ('merge', 'fixup line'),
594
 
                          ('text2', 'line from 2'),
595
 
                          ])
596
 
 
597
 
        self.assertEqual(list(k.get_ancestry(['merge'])),
598
 
                         ['text0', 'text1', 'text2', 'merge'])
599
 
 
600
 
        self.log('k._weave=' + pformat(k._weave))
601
 
 
602
 
        self.check_read_write(k)
603
 
 
604
 
 
605
 
class Conflicts(TestBase):
606
 
    """Test detection of conflicting regions during a merge.
607
 
 
608
 
    A base version is inserted, then two descendents try to
609
 
    insert different lines in the same place.  These should be
610
 
    reported as a possible conflict and forwarded to the user."""
611
 
    def runTest(self):
612
 
        return  # NOT RUN
613
 
        k = Weave()
614
 
 
615
 
        k.add_lines([], ['aaa', 'bbb'])
616
 
        k.add_lines([0], ['aaa', '111', 'bbb'])
617
 
        k.add_lines([1], ['aaa', '222', 'bbb'])
618
 
 
619
 
        merged = k.merge([1, 2])
620
 
 
621
 
        self.assertEquals([[['aaa']],
622
 
                           [['111'], ['222']],
623
 
                           [['bbb']]])
624
 
 
625
 
 
626
 
class NonConflict(TestBase):
627
 
    """Two descendants insert compatible changes.
628
 
 
629
 
    No conflict should be reported."""
630
 
    def runTest(self):
631
 
        return  # NOT RUN
632
 
        k = Weave()
633
 
 
634
 
        k.add_lines([], ['aaa', 'bbb'])
635
 
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
636
 
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
637
 
 
638
 
 
639
 
class Khayyam(TestBase):
640
 
    """Test changes to multi-line texts, and read/write"""
641
 
 
642
 
    def test_multi_line_merge(self):
643
 
        rawtexts = [
644
 
            """A Book of Verses underneath the Bough,
645
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
646
 
            Beside me singing in the Wilderness --
647
 
            Oh, Wilderness were Paradise enow!""",
648
 
            
649
 
            """A Book of Verses underneath the Bough,
650
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
651
 
            Beside me singing in the Wilderness --
652
 
            Oh, Wilderness were Paradise now!""",
653
 
 
654
 
            """A Book of poems underneath the tree,
655
 
            A Jug of Wine, a Loaf of Bread,
656
 
            and Thou
657
 
            Beside me singing in the Wilderness --
658
 
            Oh, Wilderness were Paradise now!
659
 
 
660
 
            -- O. Khayyam""",
661
 
 
662
 
            """A Book of Verses underneath the Bough,
663
 
            A Jug of Wine, a Loaf of Bread,
664
 
            and Thou
665
 
            Beside me singing in the Wilderness --
666
 
            Oh, Wilderness were Paradise now!""",
667
 
            ]
668
 
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
669
 
 
670
 
        k = Weave()
671
 
        parents = set()
672
 
        i = 0
673
 
        for t in texts:
674
 
            ver = k.add_lines('text%d' % i,
675
 
                        list(parents), t)
676
 
            parents.add('text%d' % i)
677
 
            i += 1
678
 
 
679
 
        self.log("k._weave=" + pformat(k._weave))
680
 
 
681
 
        for i, t in enumerate(texts):
682
 
            self.assertEqual(k.get_lines(i), t)
683
 
 
684
 
        self.check_read_write(k)
685
 
 
686
 
 
687
 
class JoinWeavesTests(TestBase):
688
 
    def setUp(self):
689
 
        super(JoinWeavesTests, self).setUp()
690
 
        self.weave1 = Weave()
691
 
        self.lines1 = ['hello\n']
692
 
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
693
 
        self.weave1.add_lines('v1', [], self.lines1)
694
 
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
695
 
        self.weave1.add_lines('v3', ['v2'], self.lines3)
696
 
        
697
 
    def test_join_empty(self):
698
 
        """Join two empty weaves."""
699
 
        eq = self.assertEqual
700
 
        w1 = Weave()
701
 
        w2 = Weave()
702
 
        w1.join(w2)
703
 
        eq(len(w1), 0)
704
 
        
705
 
    def test_join_empty_to_nonempty(self):
706
 
        """Join empty weave onto nonempty."""
707
 
        self.weave1.join(Weave())
708
 
        self.assertEqual(len(self.weave1), 3)
709
 
 
710
 
    def test_join_unrelated(self):
711
 
        """Join two weaves with no history in common."""
712
 
        wb = Weave()
713
 
        wb.add_lines('b1', [], ['line from b\n'])
714
 
        w1 = self.weave1
715
 
        w1.join(wb)
716
 
        eq = self.assertEqual
717
 
        eq(len(w1), 4)
718
 
        eq(sorted(w1.versions()),
719
 
           ['b1', 'v1', 'v2', 'v3'])
720
 
 
721
 
    def test_join_related(self):
722
 
        wa = self.weave1.copy()
723
 
        wb = self.weave1.copy()
724
 
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
725
 
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
726
 
        eq = self.assertEquals
727
 
        eq(len(wa), 4)
728
 
        eq(len(wb), 4)
729
 
        wa.join(wb)
730
 
        eq(len(wa), 5)
731
 
        eq(wa.get_lines('b1'),
732
 
           ['hello\n', 'pale blue\n', 'world\n'])
733
 
 
734
 
    def test_join_parent_disagreement(self):
735
 
        #join reconciles differening parents into a union.
736
 
        wa = Weave()
737
 
        wb = Weave()
738
 
        wa.add_lines('v1', [], ['hello\n'])
739
 
        wb.add_lines('v0', [], [])
740
 
        wb.add_lines('v1', ['v0'], ['hello\n'])
741
 
        wa.join(wb)
742
 
        self.assertEqual(['v0'], wa.get_parents('v1'))
743
 
 
744
 
    def test_join_text_disagreement(self):
745
 
        """Cannot join weaves with different texts for a version."""
746
 
        wa = Weave()
747
 
        wb = Weave()
748
 
        wa.add_lines('v1', [], ['hello\n'])
749
 
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
750
 
        self.assertRaises(WeaveError,
751
 
                          wa.join, wb)
752
 
 
753
 
    def test_join_unordered(self):
754
 
        """Join weaves where indexes differ.
755
 
        
756
 
        The source weave contains a different version at index 0."""
757
 
        wa = self.weave1.copy()
758
 
        wb = Weave()
759
 
        wb.add_lines('x1', [], ['line from x1\n'])
760
 
        wb.add_lines('v1', [], ['hello\n'])
761
 
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
762
 
        wa.join(wb)
763
 
        eq = self.assertEquals
764
 
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
765
 
        eq(wa.get_text('x1'), 'line from x1\n')
766
 
 
767
 
    def test_written_detection(self):
768
 
        # Test detection of weave file corruption.
769
 
        #
770
 
        # Make sure that we can detect if a weave file has
771
 
        # been corrupted. This doesn't test all forms of corruption,
772
 
        # but it at least helps verify the data you get, is what you want.
773
 
        from cStringIO import StringIO
774
 
 
775
 
        w = Weave()
776
 
        w.add_lines('v1', [], ['hello\n'])
777
 
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
778
 
 
779
 
        tmpf = StringIO()
780
 
        write_weave(w, tmpf)
781
 
 
782
 
        # Because we are corrupting, we need to make sure we have the exact text
783
 
        self.assertEquals('# bzr weave file v5\n'
784
 
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
785
 
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
786
 
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
787
 
                          tmpf.getvalue())
788
 
 
789
 
        # Change a single letter
790
 
        tmpf = StringIO('# bzr weave file v5\n'
791
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
792
 
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
793
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
794
 
 
795
 
        w = read_weave(tmpf)
796
 
 
797
 
        self.assertEqual('hello\n', w.get_text('v1'))
798
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
799
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
800
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
801
 
 
802
 
        # Change the sha checksum
803
 
        tmpf = StringIO('# bzr weave file v5\n'
804
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
805
 
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
806
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
807
 
 
808
 
        w = read_weave(tmpf)
809
 
 
810
 
        self.assertEqual('hello\n', w.get_text('v1'))
811
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
812
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
813
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
814
 
 
815
 
 
816
 
class InstrumentedWeave(Weave):
817
 
    """Keep track of how many times functions are called."""
818
 
    
819
 
    def __init__(self, weave_name=None):
820
 
        self._extract_count = 0
821
 
        Weave.__init__(self, weave_name=weave_name)
822
 
 
823
 
    def _extract(self, versions):
824
 
        self._extract_count += 1
825
 
        return Weave._extract(self, versions)
826
 
 
827
 
 
828
 
class JoinOptimization(TestCase):
829
 
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
830
 
 
831
 
    def test_join(self):
832
 
        w1 = InstrumentedWeave()
833
 
        w2 = InstrumentedWeave()
834
 
 
835
 
        txt0 = ['a\n']
836
 
        txt1 = ['a\n', 'b\n']
837
 
        txt2 = ['a\n', 'c\n']
838
 
        txt3 = ['a\n', 'b\n', 'c\n']
839
 
 
840
 
        w1.add_lines('txt0', [], txt0) # extract 1a
841
 
        w2.add_lines('txt0', [], txt0) # extract 1b
842
 
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
843
 
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
844
 
        w1.join(w2) # extract 3a to add txt2 
845
 
        w2.join(w1) # extract 3b to add txt1 
846
 
 
847
 
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
848
 
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
849
 
        # These secretly have inverted parents
850
 
 
851
 
        # This should not have to do any extractions
852
 
        w1.join(w2) # NO extract, texts already present with same parents
853
 
        w2.join(w1) # NO extract, texts already present with same parents
854
 
 
855
 
        self.assertEqual(4, w1._extract_count)
856
 
        self.assertEqual(4, w2._extract_count)
857
 
 
858
 
    def test_double_parent(self):
859
 
        # It should not be considered illegal to add
860
 
        # a revision with the same parent twice
861
 
        w1 = InstrumentedWeave()
862
 
        w2 = InstrumentedWeave()
863
 
 
864
 
        txt0 = ['a\n']
865
 
        txt1 = ['a\n', 'b\n']
866
 
        txt2 = ['a\n', 'c\n']
867
 
        txt3 = ['a\n', 'b\n', 'c\n']
868
 
 
869
 
        w1.add_lines('txt0', [], txt0)
870
 
        w2.add_lines('txt0', [], txt0)
871
 
        w1.add_lines('txt1', ['txt0'], txt1)
872
 
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
873
 
        # Same text, effectively the same, because the
874
 
        # parent is only repeated
875
 
        w1.join(w2) # extract 3a to add txt2 
876
 
        w2.join(w1) # extract 3b to add txt1 
877
 
 
878
 
 
879
 
class TestNeedsReweave(TestCase):
880
 
    """Internal corner cases for when reweave is needed."""
881
 
 
882
 
    def test_compatible_parents(self):
883
 
        w1 = Weave('a')
884
 
        my_parents = set([1, 2, 3])
885
 
        # subsets are ok
886
 
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
887
 
        # same sets
888
 
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
889
 
        # same empty corner case
890
 
        self.assertTrue(w1._compatible_parents(set(), set()))
891
 
        # other cannot contain stuff my_parents does not
892
 
        self.assertFalse(w1._compatible_parents(set(), set([1])))
893
 
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
894
 
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
895
 
 
896
 
 
897
 
class TestWeaveFile(TestCaseInTempDir):
898
 
    
899
 
    def test_empty_file(self):
900
 
        f = open('empty.weave', 'wb+')
901
 
        try:
902
 
            self.assertRaises(errors.WeaveFormatError,
903
 
                              read_weave, f)
904
 
        finally:
905
 
            f.close()