~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Martin Pool
  • Date: 2007-03-24 00:06:57 UTC
  • mto: (2323.5.3 0.15)
  • mto: This revision was merged to the branch mainline in revision 2390.
  • Revision ID: mbp@sourcefrog.net-20070324000657-fkotsej7quseardh
prepare rc3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#! /usr/bin/python2.4
 
2
 
 
3
# Copyright (C) 2005 Canonical Ltd
 
4
#
 
5
# This program is free software; you can redistribute it and/or modify
 
6
# it under the terms of the GNU General Public License as published by
 
7
# the Free Software Foundation; either version 2 of the License, or
 
8
# (at your option) any later version.
 
9
#
 
10
# This program is distributed in the hope that it will be useful,
 
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
# GNU General Public License for more details.
 
14
#
 
15
# You should have received a copy of the GNU General Public License
 
16
# along with this program; if not, write to the Free Software
 
17
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
18
 
 
19
 
 
20
# TODO: tests regarding version names
 
21
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
22
#       if it fails.
 
23
 
 
24
"""test suite for weave algorithm"""
 
25
 
 
26
from pprint import pformat
 
27
 
 
28
from bzrlib import (
 
29
    errors,
 
30
    )
 
31
from bzrlib.osutils import sha_string
 
32
from bzrlib.tests import TestCase, TestCaseInTempDir
 
33
from bzrlib.weave import Weave, WeaveFormatError, WeaveError, reweave
 
34
from bzrlib.weavefile import write_weave, read_weave
 
35
 
 
36
 
 
37
# texts for use in testing
 
38
TEXT_0 = ["Hello world"]
 
39
TEXT_1 = ["Hello world",
 
40
          "A second line"]
 
41
 
 
42
 
 
43
class TestBase(TestCase):
 
44
    def check_read_write(self, k):
 
45
        """Check the weave k can be written & re-read."""
 
46
        from tempfile import TemporaryFile
 
47
        tf = TemporaryFile()
 
48
 
 
49
        write_weave(k, tf)
 
50
        tf.seek(0)
 
51
        k2 = read_weave(tf)
 
52
 
 
53
        if k != k2:
 
54
            tf.seek(0)
 
55
            self.log('serialized weave:')
 
56
            self.log(tf.read())
 
57
 
 
58
            self.log('')
 
59
            self.log('parents: %s' % (k._parents == k2._parents))
 
60
            self.log('         %r' % k._parents)
 
61
            self.log('         %r' % k2._parents)
 
62
            self.log('')
 
63
            self.fail('read/write check failed')
 
64
 
 
65
 
 
66
class WeaveContains(TestBase):
 
67
    """Weave __contains__ operator"""
 
68
    def runTest(self):
 
69
        k = Weave()
 
70
        self.assertFalse('foo' in k)
 
71
        k.add_lines('foo', [], TEXT_1)
 
72
        self.assertTrue('foo' in k)
 
73
 
 
74
 
 
75
class Easy(TestBase):
 
76
    def runTest(self):
 
77
        k = Weave()
 
78
 
 
79
 
 
80
class StoreText(TestBase):
 
81
    """Store and retrieve a simple text."""
 
82
 
 
83
    def test_storing_text(self):
 
84
        k = Weave()
 
85
        idx = k.add_lines('text0', [], TEXT_0)
 
86
        self.assertEqual(k.get_lines(idx), TEXT_0)
 
87
        self.assertEqual(idx, 0)
 
88
 
 
89
 
 
90
class AnnotateOne(TestBase):
 
91
    def runTest(self):
 
92
        k = Weave()
 
93
        k.add_lines('text0', [], TEXT_0)
 
94
        self.assertEqual(k.annotate('text0'),
 
95
                         [('text0', TEXT_0[0])])
 
96
 
 
97
 
 
98
class StoreTwo(TestBase):
 
99
    def runTest(self):
 
100
        k = Weave()
 
101
 
 
102
        idx = k.add_lines('text0', [], TEXT_0)
 
103
        self.assertEqual(idx, 0)
 
104
 
 
105
        idx = k.add_lines('text1', [], TEXT_1)
 
106
        self.assertEqual(idx, 1)
 
107
 
 
108
        self.assertEqual(k.get_lines(0), TEXT_0)
 
109
        self.assertEqual(k.get_lines(1), TEXT_1)
 
110
 
 
111
 
 
112
class GetSha1(TestBase):
 
113
    def test_get_sha1(self):
 
114
        k = Weave()
 
115
        k.add_lines('text0', [], 'text0')
 
116
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
117
                         k.get_sha1('text0'))
 
118
        self.assertRaises(errors.RevisionNotPresent,
 
119
                          k.get_sha1, 0)
 
120
        self.assertRaises(errors.RevisionNotPresent,
 
121
                          k.get_sha1, 'text1')
 
122
                        
 
123
 
 
124
class InvalidAdd(TestBase):
 
125
    """Try to use invalid version number during add."""
 
126
    def runTest(self):
 
127
        k = Weave()
 
128
 
 
129
        self.assertRaises(errors.RevisionNotPresent,
 
130
                          k.add_lines,
 
131
                          'text0',
 
132
                          ['69'],
 
133
                          ['new text!'])
 
134
 
 
135
 
 
136
class RepeatedAdd(TestBase):
 
137
    """Add the same version twice; harmless."""
 
138
    def runTest(self):
 
139
        k = Weave()
 
140
        idx = k.add_lines('text0', [], TEXT_0)
 
141
        idx2 = k.add_lines('text0', [], TEXT_0)
 
142
        self.assertEqual(idx, idx2)
 
143
 
 
144
 
 
145
class InvalidRepeatedAdd(TestBase):
 
146
    def runTest(self):
 
147
        k = Weave()
 
148
        k.add_lines('basis', [], TEXT_0)
 
149
        idx = k.add_lines('text0', [], TEXT_0)
 
150
        self.assertRaises(errors.RevisionAlreadyPresent,
 
151
                          k.add_lines,
 
152
                          'text0',
 
153
                          [],
 
154
                          ['not the same text'])
 
155
        self.assertRaises(errors.RevisionAlreadyPresent,
 
156
                          k.add_lines,
 
157
                          'text0',
 
158
                          ['basis'],         # not the right parents
 
159
                          TEXT_0)
 
160
        
 
161
 
 
162
class InsertLines(TestBase):
 
163
    """Store a revision that adds one line to the original.
 
164
 
 
165
    Look at the annotations to make sure that the first line is matched
 
166
    and not stored repeatedly."""
 
167
    def runTest(self):
 
168
        k = Weave()
 
169
 
 
170
        k.add_lines('text0', [], ['line 1'])
 
171
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
 
172
 
 
173
        self.assertEqual(k.annotate('text0'),
 
174
                         [('text0', 'line 1')])
 
175
 
 
176
        self.assertEqual(k.get_lines(1),
 
177
                         ['line 1',
 
178
                          'line 2'])
 
179
 
 
180
        self.assertEqual(k.annotate('text1'),
 
181
                         [('text0', 'line 1'),
 
182
                          ('text1', 'line 2')])
 
183
 
 
184
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
 
185
 
 
186
        self.assertEqual(k.annotate('text2'),
 
187
                         [('text0', 'line 1'),
 
188
                          ('text2', 'diverged line')])
 
189
 
 
190
        text3 = ['line 1', 'middle line', 'line 2']
 
191
        k.add_lines('text3',
 
192
              ['text0', 'text1'],
 
193
              text3)
 
194
 
 
195
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
196
 
 
197
        self.log("k._weave=" + pformat(k._weave))
 
198
 
 
199
        self.assertEqual(k.annotate('text3'),
 
200
                         [('text0', 'line 1'),
 
201
                          ('text3', 'middle line'),
 
202
                          ('text1', 'line 2')])
 
203
 
 
204
        # now multiple insertions at different places
 
205
        k.add_lines('text4',
 
206
              ['text0', 'text1', 'text3'],
 
207
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
208
 
 
209
        self.assertEqual(k.annotate('text4'), 
 
210
                         [('text0', 'line 1'),
 
211
                          ('text4', 'aaa'),
 
212
                          ('text3', 'middle line'),
 
213
                          ('text4', 'bbb'),
 
214
                          ('text1', 'line 2'),
 
215
                          ('text4', 'ccc')])
 
216
 
 
217
 
 
218
class DeleteLines(TestBase):
 
219
    """Deletion of lines from existing text.
 
220
 
 
221
    Try various texts all based on a common ancestor."""
 
222
    def runTest(self):
 
223
        k = Weave()
 
224
 
 
225
        base_text = ['one', 'two', 'three', 'four']
 
226
 
 
227
        k.add_lines('text0', [], base_text)
 
228
        
 
229
        texts = [['one', 'two', 'three'],
 
230
                 ['two', 'three', 'four'],
 
231
                 ['one', 'four'],
 
232
                 ['one', 'two', 'three', 'four'],
 
233
                 ]
 
234
 
 
235
        i = 1
 
236
        for t in texts:
 
237
            ver = k.add_lines('text%d' % i,
 
238
                        ['text0'], t)
 
239
            i += 1
 
240
 
 
241
        self.log('final weave:')
 
242
        self.log('k._weave=' + pformat(k._weave))
 
243
 
 
244
        for i in range(len(texts)):
 
245
            self.assertEqual(k.get_lines(i+1),
 
246
                             texts[i])
 
247
 
 
248
 
 
249
class SuicideDelete(TestBase):
 
250
    """Invalid weave which tries to add and delete simultaneously."""
 
251
    def runTest(self):
 
252
        k = Weave()
 
253
 
 
254
        k._parents = [(),
 
255
                ]
 
256
        k._weave = [('{', 0),
 
257
                'first line',
 
258
                ('[', 0),
 
259
                'deleted in 0',
 
260
                (']', 0),
 
261
                ('}', 0),
 
262
                ]
 
263
        ################################### SKIPPED
 
264
        # Weave.get doesn't trap this anymore
 
265
        return 
 
266
 
 
267
        self.assertRaises(WeaveFormatError,
 
268
                          k.get_lines,
 
269
                          0)        
 
270
 
 
271
 
 
272
class CannedDelete(TestBase):
 
273
    """Unpack canned weave with deleted lines."""
 
274
    def runTest(self):
 
275
        k = Weave()
 
276
 
 
277
        k._parents = [(),
 
278
                frozenset([0]),
 
279
                ]
 
280
        k._weave = [('{', 0),
 
281
                'first line',
 
282
                ('[', 1),
 
283
                'line to be deleted',
 
284
                (']', 1),
 
285
                'last line',
 
286
                ('}', 0),
 
287
                ]
 
288
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
289
                  , sha_string('first linelast line')]
 
290
 
 
291
        self.assertEqual(k.get_lines(0),
 
292
                         ['first line',
 
293
                          'line to be deleted',
 
294
                          'last line',
 
295
                          ])
 
296
 
 
297
        self.assertEqual(k.get_lines(1),
 
298
                         ['first line',
 
299
                          'last line',
 
300
                          ])
 
301
 
 
302
 
 
303
class CannedReplacement(TestBase):
 
304
    """Unpack canned weave with deleted lines."""
 
305
    def runTest(self):
 
306
        k = Weave()
 
307
 
 
308
        k._parents = [frozenset(),
 
309
                frozenset([0]),
 
310
                ]
 
311
        k._weave = [('{', 0),
 
312
                'first line',
 
313
                ('[', 1),
 
314
                'line to be deleted',
 
315
                (']', 1),
 
316
                ('{', 1),
 
317
                'replacement line',                
 
318
                ('}', 1),
 
319
                'last line',
 
320
                ('}', 0),
 
321
                ]
 
322
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
323
                  , sha_string('first linereplacement linelast line')]
 
324
 
 
325
        self.assertEqual(k.get_lines(0),
 
326
                         ['first line',
 
327
                          'line to be deleted',
 
328
                          'last line',
 
329
                          ])
 
330
 
 
331
        self.assertEqual(k.get_lines(1),
 
332
                         ['first line',
 
333
                          'replacement line',
 
334
                          'last line',
 
335
                          ])
 
336
 
 
337
 
 
338
class BadWeave(TestBase):
 
339
    """Test that we trap an insert which should not occur."""
 
340
    def runTest(self):
 
341
        k = Weave()
 
342
 
 
343
        k._parents = [frozenset(),
 
344
                ]
 
345
        k._weave = ['bad line',
 
346
                ('{', 0),
 
347
                'foo {',
 
348
                ('{', 1),
 
349
                '  added in version 1',
 
350
                ('{', 2),
 
351
                '  added in v2',
 
352
                ('}', 2),
 
353
                '  also from v1',
 
354
                ('}', 1),
 
355
                '}',
 
356
                ('}', 0)]
 
357
 
 
358
        ################################### SKIPPED
 
359
        # Weave.get doesn't trap this anymore
 
360
        return 
 
361
 
 
362
 
 
363
        self.assertRaises(WeaveFormatError,
 
364
                          k.get,
 
365
                          0)
 
366
 
 
367
 
 
368
class BadInsert(TestBase):
 
369
    """Test that we trap an insert which should not occur."""
 
370
    def runTest(self):
 
371
        k = Weave()
 
372
 
 
373
        k._parents = [frozenset(),
 
374
                frozenset([0]),
 
375
                frozenset([0]),
 
376
                frozenset([0,1,2]),
 
377
                ]
 
378
        k._weave = [('{', 0),
 
379
                'foo {',
 
380
                ('{', 1),
 
381
                '  added in version 1',
 
382
                ('{', 1),
 
383
                '  more in 1',
 
384
                ('}', 1),
 
385
                ('}', 1),
 
386
                ('}', 0)]
 
387
 
 
388
 
 
389
        # this is not currently enforced by get
 
390
        return  ##########################################
 
391
 
 
392
        self.assertRaises(WeaveFormatError,
 
393
                          k.get,
 
394
                          0)
 
395
 
 
396
        self.assertRaises(WeaveFormatError,
 
397
                          k.get,
 
398
                          1)
 
399
 
 
400
 
 
401
class InsertNested(TestBase):
 
402
    """Insertion with nested instructions."""
 
403
    def runTest(self):
 
404
        k = Weave()
 
405
 
 
406
        k._parents = [frozenset(),
 
407
                frozenset([0]),
 
408
                frozenset([0]),
 
409
                frozenset([0,1,2]),
 
410
                ]
 
411
        k._weave = [('{', 0),
 
412
                'foo {',
 
413
                ('{', 1),
 
414
                '  added in version 1',
 
415
                ('{', 2),
 
416
                '  added in v2',
 
417
                ('}', 2),
 
418
                '  also from v1',
 
419
                ('}', 1),
 
420
                '}',
 
421
                ('}', 0)]
 
422
 
 
423
        k._sha1s = [sha_string('foo {}')
 
424
                  , sha_string('foo {  added in version 1  also from v1}')
 
425
                  , sha_string('foo {  added in v2}')
 
426
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
427
                  ]
 
428
 
 
429
        self.assertEqual(k.get_lines(0),
 
430
                         ['foo {',
 
431
                          '}'])
 
432
 
 
433
        self.assertEqual(k.get_lines(1),
 
434
                         ['foo {',
 
435
                          '  added in version 1',
 
436
                          '  also from v1',
 
437
                          '}'])
 
438
                       
 
439
        self.assertEqual(k.get_lines(2),
 
440
                         ['foo {',
 
441
                          '  added in v2',
 
442
                          '}'])
 
443
 
 
444
        self.assertEqual(k.get_lines(3),
 
445
                         ['foo {',
 
446
                          '  added in version 1',
 
447
                          '  added in v2',
 
448
                          '  also from v1',
 
449
                          '}'])
 
450
                         
 
451
 
 
452
class DeleteLines2(TestBase):
 
453
    """Test recording revisions that delete lines.
 
454
 
 
455
    This relies on the weave having a way to represent lines knocked
 
456
    out by a later revision."""
 
457
    def runTest(self):
 
458
        k = Weave()
 
459
 
 
460
        k.add_lines('text0', [], ["line the first",
 
461
                   "line 2",
 
462
                   "line 3",
 
463
                   "fine"])
 
464
 
 
465
        self.assertEqual(len(k.get_lines(0)), 4)
 
466
 
 
467
        k.add_lines('text1', ['text0'], ["line the first",
 
468
                   "fine"])
 
469
 
 
470
        self.assertEqual(k.get_lines(1),
 
471
                         ["line the first",
 
472
                          "fine"])
 
473
 
 
474
        self.assertEqual(k.annotate('text1'),
 
475
                         [('text0', "line the first"),
 
476
                          ('text0', "fine")])
 
477
 
 
478
 
 
479
class IncludeVersions(TestBase):
 
480
    """Check texts that are stored across multiple revisions.
 
481
 
 
482
    Here we manually create a weave with particular encoding and make
 
483
    sure it unpacks properly.
 
484
 
 
485
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
486
    lines.
 
487
    """
 
488
 
 
489
    def runTest(self):
 
490
        k = Weave()
 
491
 
 
492
        k._parents = [frozenset(), frozenset([0])]
 
493
        k._weave = [('{', 0),
 
494
                "first line",
 
495
                ('}', 0),
 
496
                ('{', 1),
 
497
                "second line",
 
498
                ('}', 1)]
 
499
 
 
500
        k._sha1s = [sha_string('first line')
 
501
                  , sha_string('first linesecond line')]
 
502
 
 
503
        self.assertEqual(k.get_lines(1),
 
504
                         ["first line",
 
505
                          "second line"])
 
506
 
 
507
        self.assertEqual(k.get_lines(0),
 
508
                         ["first line"])
 
509
 
 
510
 
 
511
class DivergedIncludes(TestBase):
 
512
    """Weave with two diverged texts based on version 0.
 
513
    """
 
514
    def runTest(self):
 
515
        # FIXME make the weave, dont poke at it.
 
516
        k = Weave()
 
517
 
 
518
        k._names = ['0', '1', '2']
 
519
        k._name_map = {'0':0, '1':1, '2':2}
 
520
        k._parents = [frozenset(),
 
521
                frozenset([0]),
 
522
                frozenset([0]),
 
523
                ]
 
524
        k._weave = [('{', 0),
 
525
                "first line",
 
526
                ('}', 0),
 
527
                ('{', 1),
 
528
                "second line",
 
529
                ('}', 1),
 
530
                ('{', 2),
 
531
                "alternative second line",
 
532
                ('}', 2),                
 
533
                ]
 
534
 
 
535
        k._sha1s = [sha_string('first line')
 
536
                  , sha_string('first linesecond line')
 
537
                  , sha_string('first linealternative second line')]
 
538
 
 
539
        self.assertEqual(k.get_lines(0),
 
540
                         ["first line"])
 
541
 
 
542
        self.assertEqual(k.get_lines(1),
 
543
                         ["first line",
 
544
                          "second line"])
 
545
 
 
546
        self.assertEqual(k.get_lines('2'),
 
547
                         ["first line",
 
548
                          "alternative second line"])
 
549
 
 
550
        self.assertEqual(list(k.get_ancestry(['2'])),
 
551
                         ['0', '2'])
 
552
 
 
553
 
 
554
class ReplaceLine(TestBase):
 
555
    def runTest(self):
 
556
        k = Weave()
 
557
 
 
558
        text0 = ['cheddar', 'stilton', 'gruyere']
 
559
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
560
        
 
561
        k.add_lines('text0', [], text0)
 
562
        k.add_lines('text1', ['text0'], text1)
 
563
 
 
564
        self.log('k._weave=' + pformat(k._weave))
 
565
 
 
566
        self.assertEqual(k.get_lines(0), text0)
 
567
        self.assertEqual(k.get_lines(1), text1)
 
568
 
 
569
 
 
570
class Merge(TestBase):
 
571
    """Storage of versions that merge diverged parents"""
 
572
    def runTest(self):
 
573
        k = Weave()
 
574
 
 
575
        texts = [['header'],
 
576
                 ['header', '', 'line from 1'],
 
577
                 ['header', '', 'line from 2', 'more from 2'],
 
578
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
579
                 ]
 
580
 
 
581
        k.add_lines('text0', [], texts[0])
 
582
        k.add_lines('text1', ['text0'], texts[1])
 
583
        k.add_lines('text2', ['text0'], texts[2])
 
584
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
 
585
 
 
586
        for i, t in enumerate(texts):
 
587
            self.assertEqual(k.get_lines(i), t)
 
588
 
 
589
        self.assertEqual(k.annotate('merge'),
 
590
                         [('text0', 'header'),
 
591
                          ('text1', ''),
 
592
                          ('text1', 'line from 1'),
 
593
                          ('merge', 'fixup line'),
 
594
                          ('text2', 'line from 2'),
 
595
                          ])
 
596
 
 
597
        self.assertEqual(list(k.get_ancestry(['merge'])),
 
598
                         ['text0', 'text1', 'text2', 'merge'])
 
599
 
 
600
        self.log('k._weave=' + pformat(k._weave))
 
601
 
 
602
        self.check_read_write(k)
 
603
 
 
604
 
 
605
class Conflicts(TestBase):
 
606
    """Test detection of conflicting regions during a merge.
 
607
 
 
608
    A base version is inserted, then two descendents try to
 
609
    insert different lines in the same place.  These should be
 
610
    reported as a possible conflict and forwarded to the user."""
 
611
    def runTest(self):
 
612
        return  # NOT RUN
 
613
        k = Weave()
 
614
 
 
615
        k.add_lines([], ['aaa', 'bbb'])
 
616
        k.add_lines([0], ['aaa', '111', 'bbb'])
 
617
        k.add_lines([1], ['aaa', '222', 'bbb'])
 
618
 
 
619
        merged = k.merge([1, 2])
 
620
 
 
621
        self.assertEquals([[['aaa']],
 
622
                           [['111'], ['222']],
 
623
                           [['bbb']]])
 
624
 
 
625
 
 
626
class NonConflict(TestBase):
 
627
    """Two descendants insert compatible changes.
 
628
 
 
629
    No conflict should be reported."""
 
630
    def runTest(self):
 
631
        return  # NOT RUN
 
632
        k = Weave()
 
633
 
 
634
        k.add_lines([], ['aaa', 'bbb'])
 
635
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
 
636
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
 
637
 
 
638
 
 
639
class Khayyam(TestBase):
 
640
    """Test changes to multi-line texts, and read/write"""
 
641
 
 
642
    def test_multi_line_merge(self):
 
643
        rawtexts = [
 
644
            """A Book of Verses underneath the Bough,
 
645
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
646
            Beside me singing in the Wilderness --
 
647
            Oh, Wilderness were Paradise enow!""",
 
648
            
 
649
            """A Book of Verses underneath the Bough,
 
650
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
651
            Beside me singing in the Wilderness --
 
652
            Oh, Wilderness were Paradise now!""",
 
653
 
 
654
            """A Book of poems underneath the tree,
 
655
            A Jug of Wine, a Loaf of Bread,
 
656
            and Thou
 
657
            Beside me singing in the Wilderness --
 
658
            Oh, Wilderness were Paradise now!
 
659
 
 
660
            -- O. Khayyam""",
 
661
 
 
662
            """A Book of Verses underneath the Bough,
 
663
            A Jug of Wine, a Loaf of Bread,
 
664
            and Thou
 
665
            Beside me singing in the Wilderness --
 
666
            Oh, Wilderness were Paradise now!""",
 
667
            ]
 
668
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
669
 
 
670
        k = Weave()
 
671
        parents = set()
 
672
        i = 0
 
673
        for t in texts:
 
674
            ver = k.add_lines('text%d' % i,
 
675
                        list(parents), t)
 
676
            parents.add('text%d' % i)
 
677
            i += 1
 
678
 
 
679
        self.log("k._weave=" + pformat(k._weave))
 
680
 
 
681
        for i, t in enumerate(texts):
 
682
            self.assertEqual(k.get_lines(i), t)
 
683
 
 
684
        self.check_read_write(k)
 
685
 
 
686
 
 
687
class JoinWeavesTests(TestBase):
 
688
    def setUp(self):
 
689
        super(JoinWeavesTests, self).setUp()
 
690
        self.weave1 = Weave()
 
691
        self.lines1 = ['hello\n']
 
692
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
693
        self.weave1.add_lines('v1', [], self.lines1)
 
694
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
695
        self.weave1.add_lines('v3', ['v2'], self.lines3)
 
696
        
 
697
    def test_join_empty(self):
 
698
        """Join two empty weaves."""
 
699
        eq = self.assertEqual
 
700
        w1 = Weave()
 
701
        w2 = Weave()
 
702
        w1.join(w2)
 
703
        eq(len(w1), 0)
 
704
        
 
705
    def test_join_empty_to_nonempty(self):
 
706
        """Join empty weave onto nonempty."""
 
707
        self.weave1.join(Weave())
 
708
        self.assertEqual(len(self.weave1), 3)
 
709
 
 
710
    def test_join_unrelated(self):
 
711
        """Join two weaves with no history in common."""
 
712
        wb = Weave()
 
713
        wb.add_lines('b1', [], ['line from b\n'])
 
714
        w1 = self.weave1
 
715
        w1.join(wb)
 
716
        eq = self.assertEqual
 
717
        eq(len(w1), 4)
 
718
        eq(sorted(w1.versions()),
 
719
           ['b1', 'v1', 'v2', 'v3'])
 
720
 
 
721
    def test_join_related(self):
 
722
        wa = self.weave1.copy()
 
723
        wb = self.weave1.copy()
 
724
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
725
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
726
        eq = self.assertEquals
 
727
        eq(len(wa), 4)
 
728
        eq(len(wb), 4)
 
729
        wa.join(wb)
 
730
        eq(len(wa), 5)
 
731
        eq(wa.get_lines('b1'),
 
732
           ['hello\n', 'pale blue\n', 'world\n'])
 
733
 
 
734
    def test_join_parent_disagreement(self):
 
735
        #join reconciles differening parents into a union.
 
736
        wa = Weave()
 
737
        wb = Weave()
 
738
        wa.add_lines('v1', [], ['hello\n'])
 
739
        wb.add_lines('v0', [], [])
 
740
        wb.add_lines('v1', ['v0'], ['hello\n'])
 
741
        wa.join(wb)
 
742
        self.assertEqual(['v0'], wa.get_parents('v1'))
 
743
 
 
744
    def test_join_text_disagreement(self):
 
745
        """Cannot join weaves with different texts for a version."""
 
746
        wa = Weave()
 
747
        wb = Weave()
 
748
        wa.add_lines('v1', [], ['hello\n'])
 
749
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
 
750
        self.assertRaises(WeaveError,
 
751
                          wa.join, wb)
 
752
 
 
753
    def test_join_unordered(self):
 
754
        """Join weaves where indexes differ.
 
755
        
 
756
        The source weave contains a different version at index 0."""
 
757
        wa = self.weave1.copy()
 
758
        wb = Weave()
 
759
        wb.add_lines('x1', [], ['line from x1\n'])
 
760
        wb.add_lines('v1', [], ['hello\n'])
 
761
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
762
        wa.join(wb)
 
763
        eq = self.assertEquals
 
764
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
 
765
        eq(wa.get_text('x1'), 'line from x1\n')
 
766
 
 
767
    def test_written_detection(self):
 
768
        # Test detection of weave file corruption.
 
769
        #
 
770
        # Make sure that we can detect if a weave file has
 
771
        # been corrupted. This doesn't test all forms of corruption,
 
772
        # but it at least helps verify the data you get, is what you want.
 
773
        from cStringIO import StringIO
 
774
 
 
775
        w = Weave()
 
776
        w.add_lines('v1', [], ['hello\n'])
 
777
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
778
 
 
779
        tmpf = StringIO()
 
780
        write_weave(w, tmpf)
 
781
 
 
782
        # Because we are corrupting, we need to make sure we have the exact text
 
783
        self.assertEquals('# bzr weave file v5\n'
 
784
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
785
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
786
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
787
                          tmpf.getvalue())
 
788
 
 
789
        # Change a single letter
 
790
        tmpf = StringIO('# bzr weave file v5\n'
 
791
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
792
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
793
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
794
 
 
795
        w = read_weave(tmpf)
 
796
 
 
797
        self.assertEqual('hello\n', w.get_text('v1'))
 
798
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
799
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
800
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
801
 
 
802
        # Change the sha checksum
 
803
        tmpf = StringIO('# bzr weave file v5\n'
 
804
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
805
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
806
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
807
 
 
808
        w = read_weave(tmpf)
 
809
 
 
810
        self.assertEqual('hello\n', w.get_text('v1'))
 
811
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
812
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
813
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
814
 
 
815
 
 
816
class InstrumentedWeave(Weave):
 
817
    """Keep track of how many times functions are called."""
 
818
    
 
819
    def __init__(self, weave_name=None):
 
820
        self._extract_count = 0
 
821
        Weave.__init__(self, weave_name=weave_name)
 
822
 
 
823
    def _extract(self, versions):
 
824
        self._extract_count += 1
 
825
        return Weave._extract(self, versions)
 
826
 
 
827
 
 
828
class JoinOptimization(TestCase):
 
829
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
830
 
 
831
    def test_join(self):
 
832
        w1 = InstrumentedWeave()
 
833
        w2 = InstrumentedWeave()
 
834
 
 
835
        txt0 = ['a\n']
 
836
        txt1 = ['a\n', 'b\n']
 
837
        txt2 = ['a\n', 'c\n']
 
838
        txt3 = ['a\n', 'b\n', 'c\n']
 
839
 
 
840
        w1.add_lines('txt0', [], txt0) # extract 1a
 
841
        w2.add_lines('txt0', [], txt0) # extract 1b
 
842
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
 
843
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
 
844
        w1.join(w2) # extract 3a to add txt2 
 
845
        w2.join(w1) # extract 3b to add txt1 
 
846
 
 
847
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
 
848
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
 
849
        # These secretly have inverted parents
 
850
 
 
851
        # This should not have to do any extractions
 
852
        w1.join(w2) # NO extract, texts already present with same parents
 
853
        w2.join(w1) # NO extract, texts already present with same parents
 
854
 
 
855
        self.assertEqual(4, w1._extract_count)
 
856
        self.assertEqual(4, w2._extract_count)
 
857
 
 
858
    def test_double_parent(self):
 
859
        # It should not be considered illegal to add
 
860
        # a revision with the same parent twice
 
861
        w1 = InstrumentedWeave()
 
862
        w2 = InstrumentedWeave()
 
863
 
 
864
        txt0 = ['a\n']
 
865
        txt1 = ['a\n', 'b\n']
 
866
        txt2 = ['a\n', 'c\n']
 
867
        txt3 = ['a\n', 'b\n', 'c\n']
 
868
 
 
869
        w1.add_lines('txt0', [], txt0)
 
870
        w2.add_lines('txt0', [], txt0)
 
871
        w1.add_lines('txt1', ['txt0'], txt1)
 
872
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
 
873
        # Same text, effectively the same, because the
 
874
        # parent is only repeated
 
875
        w1.join(w2) # extract 3a to add txt2 
 
876
        w2.join(w1) # extract 3b to add txt1 
 
877
 
 
878
 
 
879
class TestNeedsReweave(TestCase):
 
880
    """Internal corner cases for when reweave is needed."""
 
881
 
 
882
    def test_compatible_parents(self):
 
883
        w1 = Weave('a')
 
884
        my_parents = set([1, 2, 3])
 
885
        # subsets are ok
 
886
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
 
887
        # same sets
 
888
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
889
        # same empty corner case
 
890
        self.assertTrue(w1._compatible_parents(set(), set()))
 
891
        # other cannot contain stuff my_parents does not
 
892
        self.assertFalse(w1._compatible_parents(set(), set([1])))
 
893
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
 
894
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
 
895
 
 
896
 
 
897
class TestWeaveFile(TestCaseInTempDir):
 
898
    
 
899
    def test_empty_file(self):
 
900
        f = open('empty.weave', 'wb+')
 
901
        try:
 
902
            self.assertRaises(errors.WeaveFormatError,
 
903
                              read_weave, f)
 
904
        finally:
 
905
            f.close()