~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: John Arbash Meinel
  • Date: 2007-12-10 16:46:00 UTC
  • mto: (3112.1.1 bzr_access)
  • mto: This revision was merged to the branch mainline in revision 3165.
  • Revision ID: john@arbash-meinel.com-20071210164600-xcvl9fto3gn5aqtj
Change the indentation to 4 spaces according to Bazaar style guidelines.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
16
 
 
17
 
 
18
# TODO: tests regarding version names
 
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave 
 
20
#       if it fails.
 
21
 
 
22
"""test suite for weave algorithm"""
 
23
 
 
24
from pprint import pformat
 
25
 
 
26
from bzrlib import (
 
27
    errors,
 
28
    )
 
29
from bzrlib.osutils import sha_string
 
30
from bzrlib.tests import TestCase, TestCaseInTempDir
 
31
from bzrlib.weave import Weave, WeaveFormatError, WeaveError
 
32
from bzrlib.weavefile import write_weave, read_weave
 
33
 
 
34
 
 
35
# texts for use in testing
 
36
TEXT_0 = ["Hello world"]
 
37
TEXT_1 = ["Hello world",
 
38
          "A second line"]
 
39
 
 
40
 
 
41
class TestBase(TestCase):
 
42
 
 
43
    def check_read_write(self, k):
 
44
        """Check the weave k can be written & re-read."""
 
45
        from tempfile import TemporaryFile
 
46
        tf = TemporaryFile()
 
47
 
 
48
        write_weave(k, tf)
 
49
        tf.seek(0)
 
50
        k2 = read_weave(tf)
 
51
 
 
52
        if k != k2:
 
53
            tf.seek(0)
 
54
            self.log('serialized weave:')
 
55
            self.log(tf.read())
 
56
 
 
57
            self.log('')
 
58
            self.log('parents: %s' % (k._parents == k2._parents))
 
59
            self.log('         %r' % k._parents)
 
60
            self.log('         %r' % k2._parents)
 
61
            self.log('')
 
62
            self.fail('read/write check failed')
 
63
 
 
64
 
 
65
class WeaveContains(TestBase):
 
66
    """Weave __contains__ operator"""
 
67
    def runTest(self):
 
68
        k = Weave()
 
69
        self.assertFalse('foo' in k)
 
70
        k.add_lines('foo', [], TEXT_1)
 
71
        self.assertTrue('foo' in k)
 
72
 
 
73
 
 
74
class Easy(TestBase):
 
75
    def runTest(self):
 
76
        k = Weave()
 
77
 
 
78
 
 
79
class AnnotateOne(TestBase):
 
80
    def runTest(self):
 
81
        k = Weave()
 
82
        k.add_lines('text0', [], TEXT_0)
 
83
        self.assertEqual(k.annotate('text0'),
 
84
                         [('text0', TEXT_0[0])])
 
85
 
 
86
 
 
87
class GetSha1(TestBase):
 
88
    def test_get_sha1(self):
 
89
        k = Weave()
 
90
        k.add_lines('text0', [], 'text0')
 
91
        self.assertEqual('34dc0e430c642a26c3dd1c2beb7a8b4f4445eb79',
 
92
                         k.get_sha1('text0'))
 
93
        self.assertRaises(errors.RevisionNotPresent,
 
94
                          k.get_sha1, 0)
 
95
        self.assertRaises(errors.RevisionNotPresent,
 
96
                          k.get_sha1, 'text1')
 
97
                        
 
98
 
 
99
class InvalidAdd(TestBase):
 
100
    """Try to use invalid version number during add."""
 
101
    def runTest(self):
 
102
        k = Weave()
 
103
 
 
104
        self.assertRaises(errors.RevisionNotPresent,
 
105
                          k.add_lines,
 
106
                          'text0',
 
107
                          ['69'],
 
108
                          ['new text!'])
 
109
 
 
110
 
 
111
class RepeatedAdd(TestBase):
 
112
    """Add the same version twice; harmless."""
 
113
 
 
114
    def test_duplicate_add(self):
 
115
        k = Weave()
 
116
        idx = k.add_lines('text0', [], TEXT_0)
 
117
        idx2 = k.add_lines('text0', [], TEXT_0)
 
118
        self.assertEqual(idx, idx2)
 
119
 
 
120
 
 
121
class InvalidRepeatedAdd(TestBase):
 
122
    def runTest(self):
 
123
        k = Weave()
 
124
        k.add_lines('basis', [], TEXT_0)
 
125
        idx = k.add_lines('text0', [], TEXT_0)
 
126
        self.assertRaises(errors.RevisionAlreadyPresent,
 
127
                          k.add_lines,
 
128
                          'text0',
 
129
                          [],
 
130
                          ['not the same text'])
 
131
        self.assertRaises(errors.RevisionAlreadyPresent,
 
132
                          k.add_lines,
 
133
                          'text0',
 
134
                          ['basis'],         # not the right parents
 
135
                          TEXT_0)
 
136
        
 
137
 
 
138
class InsertLines(TestBase):
 
139
    """Store a revision that adds one line to the original.
 
140
 
 
141
    Look at the annotations to make sure that the first line is matched
 
142
    and not stored repeatedly."""
 
143
    def runTest(self):
 
144
        k = Weave()
 
145
 
 
146
        k.add_lines('text0', [], ['line 1'])
 
147
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
 
148
 
 
149
        self.assertEqual(k.annotate('text0'),
 
150
                         [('text0', 'line 1')])
 
151
 
 
152
        self.assertEqual(k.get_lines(1),
 
153
                         ['line 1',
 
154
                          'line 2'])
 
155
 
 
156
        self.assertEqual(k.annotate('text1'),
 
157
                         [('text0', 'line 1'),
 
158
                          ('text1', 'line 2')])
 
159
 
 
160
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
 
161
 
 
162
        self.assertEqual(k.annotate('text2'),
 
163
                         [('text0', 'line 1'),
 
164
                          ('text2', 'diverged line')])
 
165
 
 
166
        text3 = ['line 1', 'middle line', 'line 2']
 
167
        k.add_lines('text3',
 
168
              ['text0', 'text1'],
 
169
              text3)
 
170
 
 
171
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
172
 
 
173
        self.log("k._weave=" + pformat(k._weave))
 
174
 
 
175
        self.assertEqual(k.annotate('text3'),
 
176
                         [('text0', 'line 1'),
 
177
                          ('text3', 'middle line'),
 
178
                          ('text1', 'line 2')])
 
179
 
 
180
        # now multiple insertions at different places
 
181
        k.add_lines('text4',
 
182
              ['text0', 'text1', 'text3'],
 
183
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
184
 
 
185
        self.assertEqual(k.annotate('text4'), 
 
186
                         [('text0', 'line 1'),
 
187
                          ('text4', 'aaa'),
 
188
                          ('text3', 'middle line'),
 
189
                          ('text4', 'bbb'),
 
190
                          ('text1', 'line 2'),
 
191
                          ('text4', 'ccc')])
 
192
 
 
193
 
 
194
class DeleteLines(TestBase):
 
195
    """Deletion of lines from existing text.
 
196
 
 
197
    Try various texts all based on a common ancestor."""
 
198
    def runTest(self):
 
199
        k = Weave()
 
200
 
 
201
        base_text = ['one', 'two', 'three', 'four']
 
202
 
 
203
        k.add_lines('text0', [], base_text)
 
204
        
 
205
        texts = [['one', 'two', 'three'],
 
206
                 ['two', 'three', 'four'],
 
207
                 ['one', 'four'],
 
208
                 ['one', 'two', 'three', 'four'],
 
209
                 ]
 
210
 
 
211
        i = 1
 
212
        for t in texts:
 
213
            ver = k.add_lines('text%d' % i,
 
214
                        ['text0'], t)
 
215
            i += 1
 
216
 
 
217
        self.log('final weave:')
 
218
        self.log('k._weave=' + pformat(k._weave))
 
219
 
 
220
        for i in range(len(texts)):
 
221
            self.assertEqual(k.get_lines(i+1),
 
222
                             texts[i])
 
223
 
 
224
 
 
225
class SuicideDelete(TestBase):
 
226
    """Invalid weave which tries to add and delete simultaneously."""
 
227
    def runTest(self):
 
228
        k = Weave()
 
229
 
 
230
        k._parents = [(),
 
231
                ]
 
232
        k._weave = [('{', 0),
 
233
                'first line',
 
234
                ('[', 0),
 
235
                'deleted in 0',
 
236
                (']', 0),
 
237
                ('}', 0),
 
238
                ]
 
239
        ################################### SKIPPED
 
240
        # Weave.get doesn't trap this anymore
 
241
        return 
 
242
 
 
243
        self.assertRaises(WeaveFormatError,
 
244
                          k.get_lines,
 
245
                          0)        
 
246
 
 
247
 
 
248
class CannedDelete(TestBase):
 
249
    """Unpack canned weave with deleted lines."""
 
250
    def runTest(self):
 
251
        k = Weave()
 
252
 
 
253
        k._parents = [(),
 
254
                frozenset([0]),
 
255
                ]
 
256
        k._weave = [('{', 0),
 
257
                'first line',
 
258
                ('[', 1),
 
259
                'line to be deleted',
 
260
                (']', 1),
 
261
                'last line',
 
262
                ('}', 0),
 
263
                ]
 
264
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
265
                  , sha_string('first linelast line')]
 
266
 
 
267
        self.assertEqual(k.get_lines(0),
 
268
                         ['first line',
 
269
                          'line to be deleted',
 
270
                          'last line',
 
271
                          ])
 
272
 
 
273
        self.assertEqual(k.get_lines(1),
 
274
                         ['first line',
 
275
                          'last line',
 
276
                          ])
 
277
 
 
278
 
 
279
class CannedReplacement(TestBase):
 
280
    """Unpack canned weave with deleted lines."""
 
281
    def runTest(self):
 
282
        k = Weave()
 
283
 
 
284
        k._parents = [frozenset(),
 
285
                frozenset([0]),
 
286
                ]
 
287
        k._weave = [('{', 0),
 
288
                'first line',
 
289
                ('[', 1),
 
290
                'line to be deleted',
 
291
                (']', 1),
 
292
                ('{', 1),
 
293
                'replacement line',                
 
294
                ('}', 1),
 
295
                'last line',
 
296
                ('}', 0),
 
297
                ]
 
298
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
299
                  , sha_string('first linereplacement linelast line')]
 
300
 
 
301
        self.assertEqual(k.get_lines(0),
 
302
                         ['first line',
 
303
                          'line to be deleted',
 
304
                          'last line',
 
305
                          ])
 
306
 
 
307
        self.assertEqual(k.get_lines(1),
 
308
                         ['first line',
 
309
                          'replacement line',
 
310
                          'last line',
 
311
                          ])
 
312
 
 
313
 
 
314
class BadWeave(TestBase):
 
315
    """Test that we trap an insert which should not occur."""
 
316
    def runTest(self):
 
317
        k = Weave()
 
318
 
 
319
        k._parents = [frozenset(),
 
320
                ]
 
321
        k._weave = ['bad line',
 
322
                ('{', 0),
 
323
                'foo {',
 
324
                ('{', 1),
 
325
                '  added in version 1',
 
326
                ('{', 2),
 
327
                '  added in v2',
 
328
                ('}', 2),
 
329
                '  also from v1',
 
330
                ('}', 1),
 
331
                '}',
 
332
                ('}', 0)]
 
333
 
 
334
        ################################### SKIPPED
 
335
        # Weave.get doesn't trap this anymore
 
336
        return 
 
337
 
 
338
 
 
339
        self.assertRaises(WeaveFormatError,
 
340
                          k.get,
 
341
                          0)
 
342
 
 
343
 
 
344
class BadInsert(TestBase):
 
345
    """Test that we trap an insert which should not occur."""
 
346
    def runTest(self):
 
347
        k = Weave()
 
348
 
 
349
        k._parents = [frozenset(),
 
350
                frozenset([0]),
 
351
                frozenset([0]),
 
352
                frozenset([0,1,2]),
 
353
                ]
 
354
        k._weave = [('{', 0),
 
355
                'foo {',
 
356
                ('{', 1),
 
357
                '  added in version 1',
 
358
                ('{', 1),
 
359
                '  more in 1',
 
360
                ('}', 1),
 
361
                ('}', 1),
 
362
                ('}', 0)]
 
363
 
 
364
 
 
365
        # this is not currently enforced by get
 
366
        return  ##########################################
 
367
 
 
368
        self.assertRaises(WeaveFormatError,
 
369
                          k.get,
 
370
                          0)
 
371
 
 
372
        self.assertRaises(WeaveFormatError,
 
373
                          k.get,
 
374
                          1)
 
375
 
 
376
 
 
377
class InsertNested(TestBase):
 
378
    """Insertion with nested instructions."""
 
379
    def runTest(self):
 
380
        k = Weave()
 
381
 
 
382
        k._parents = [frozenset(),
 
383
                frozenset([0]),
 
384
                frozenset([0]),
 
385
                frozenset([0,1,2]),
 
386
                ]
 
387
        k._weave = [('{', 0),
 
388
                'foo {',
 
389
                ('{', 1),
 
390
                '  added in version 1',
 
391
                ('{', 2),
 
392
                '  added in v2',
 
393
                ('}', 2),
 
394
                '  also from v1',
 
395
                ('}', 1),
 
396
                '}',
 
397
                ('}', 0)]
 
398
 
 
399
        k._sha1s = [sha_string('foo {}')
 
400
                  , sha_string('foo {  added in version 1  also from v1}')
 
401
                  , sha_string('foo {  added in v2}')
 
402
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
403
                  ]
 
404
 
 
405
        self.assertEqual(k.get_lines(0),
 
406
                         ['foo {',
 
407
                          '}'])
 
408
 
 
409
        self.assertEqual(k.get_lines(1),
 
410
                         ['foo {',
 
411
                          '  added in version 1',
 
412
                          '  also from v1',
 
413
                          '}'])
 
414
                       
 
415
        self.assertEqual(k.get_lines(2),
 
416
                         ['foo {',
 
417
                          '  added in v2',
 
418
                          '}'])
 
419
 
 
420
        self.assertEqual(k.get_lines(3),
 
421
                         ['foo {',
 
422
                          '  added in version 1',
 
423
                          '  added in v2',
 
424
                          '  also from v1',
 
425
                          '}'])
 
426
                         
 
427
 
 
428
class DeleteLines2(TestBase):
 
429
    """Test recording revisions that delete lines.
 
430
 
 
431
    This relies on the weave having a way to represent lines knocked
 
432
    out by a later revision."""
 
433
    def runTest(self):
 
434
        k = Weave()
 
435
 
 
436
        k.add_lines('text0', [], ["line the first",
 
437
                   "line 2",
 
438
                   "line 3",
 
439
                   "fine"])
 
440
 
 
441
        self.assertEqual(len(k.get_lines(0)), 4)
 
442
 
 
443
        k.add_lines('text1', ['text0'], ["line the first",
 
444
                   "fine"])
 
445
 
 
446
        self.assertEqual(k.get_lines(1),
 
447
                         ["line the first",
 
448
                          "fine"])
 
449
 
 
450
        self.assertEqual(k.annotate('text1'),
 
451
                         [('text0', "line the first"),
 
452
                          ('text0', "fine")])
 
453
 
 
454
 
 
455
class IncludeVersions(TestBase):
 
456
    """Check texts that are stored across multiple revisions.
 
457
 
 
458
    Here we manually create a weave with particular encoding and make
 
459
    sure it unpacks properly.
 
460
 
 
461
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
462
    lines.
 
463
    """
 
464
 
 
465
    def runTest(self):
 
466
        k = Weave()
 
467
 
 
468
        k._parents = [frozenset(), frozenset([0])]
 
469
        k._weave = [('{', 0),
 
470
                "first line",
 
471
                ('}', 0),
 
472
                ('{', 1),
 
473
                "second line",
 
474
                ('}', 1)]
 
475
 
 
476
        k._sha1s = [sha_string('first line')
 
477
                  , sha_string('first linesecond line')]
 
478
 
 
479
        self.assertEqual(k.get_lines(1),
 
480
                         ["first line",
 
481
                          "second line"])
 
482
 
 
483
        self.assertEqual(k.get_lines(0),
 
484
                         ["first line"])
 
485
 
 
486
 
 
487
class DivergedIncludes(TestBase):
 
488
    """Weave with two diverged texts based on version 0.
 
489
    """
 
490
    def runTest(self):
 
491
        # FIXME make the weave, dont poke at it.
 
492
        k = Weave()
 
493
 
 
494
        k._names = ['0', '1', '2']
 
495
        k._name_map = {'0':0, '1':1, '2':2}
 
496
        k._parents = [frozenset(),
 
497
                frozenset([0]),
 
498
                frozenset([0]),
 
499
                ]
 
500
        k._weave = [('{', 0),
 
501
                "first line",
 
502
                ('}', 0),
 
503
                ('{', 1),
 
504
                "second line",
 
505
                ('}', 1),
 
506
                ('{', 2),
 
507
                "alternative second line",
 
508
                ('}', 2),                
 
509
                ]
 
510
 
 
511
        k._sha1s = [sha_string('first line')
 
512
                  , sha_string('first linesecond line')
 
513
                  , sha_string('first linealternative second line')]
 
514
 
 
515
        self.assertEqual(k.get_lines(0),
 
516
                         ["first line"])
 
517
 
 
518
        self.assertEqual(k.get_lines(1),
 
519
                         ["first line",
 
520
                          "second line"])
 
521
 
 
522
        self.assertEqual(k.get_lines('2'),
 
523
                         ["first line",
 
524
                          "alternative second line"])
 
525
 
 
526
        self.assertEqual(list(k.get_ancestry(['2'])),
 
527
                         ['0', '2'])
 
528
 
 
529
 
 
530
class ReplaceLine(TestBase):
 
531
    def runTest(self):
 
532
        k = Weave()
 
533
 
 
534
        text0 = ['cheddar', 'stilton', 'gruyere']
 
535
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
536
        
 
537
        k.add_lines('text0', [], text0)
 
538
        k.add_lines('text1', ['text0'], text1)
 
539
 
 
540
        self.log('k._weave=' + pformat(k._weave))
 
541
 
 
542
        self.assertEqual(k.get_lines(0), text0)
 
543
        self.assertEqual(k.get_lines(1), text1)
 
544
 
 
545
 
 
546
class Merge(TestBase):
 
547
    """Storage of versions that merge diverged parents"""
 
548
    def runTest(self):
 
549
        k = Weave()
 
550
 
 
551
        texts = [['header'],
 
552
                 ['header', '', 'line from 1'],
 
553
                 ['header', '', 'line from 2', 'more from 2'],
 
554
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
555
                 ]
 
556
 
 
557
        k.add_lines('text0', [], texts[0])
 
558
        k.add_lines('text1', ['text0'], texts[1])
 
559
        k.add_lines('text2', ['text0'], texts[2])
 
560
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
 
561
 
 
562
        for i, t in enumerate(texts):
 
563
            self.assertEqual(k.get_lines(i), t)
 
564
 
 
565
        self.assertEqual(k.annotate('merge'),
 
566
                         [('text0', 'header'),
 
567
                          ('text1', ''),
 
568
                          ('text1', 'line from 1'),
 
569
                          ('merge', 'fixup line'),
 
570
                          ('text2', 'line from 2'),
 
571
                          ])
 
572
 
 
573
        self.assertEqual(list(k.get_ancestry(['merge'])),
 
574
                         ['text0', 'text1', 'text2', 'merge'])
 
575
 
 
576
        self.log('k._weave=' + pformat(k._weave))
 
577
 
 
578
        self.check_read_write(k)
 
579
 
 
580
 
 
581
class Conflicts(TestBase):
 
582
    """Test detection of conflicting regions during a merge.
 
583
 
 
584
    A base version is inserted, then two descendents try to
 
585
    insert different lines in the same place.  These should be
 
586
    reported as a possible conflict and forwarded to the user."""
 
587
    def runTest(self):
 
588
        return  # NOT RUN
 
589
        k = Weave()
 
590
 
 
591
        k.add_lines([], ['aaa', 'bbb'])
 
592
        k.add_lines([0], ['aaa', '111', 'bbb'])
 
593
        k.add_lines([1], ['aaa', '222', 'bbb'])
 
594
 
 
595
        merged = k.merge([1, 2])
 
596
 
 
597
        self.assertEquals([[['aaa']],
 
598
                           [['111'], ['222']],
 
599
                           [['bbb']]])
 
600
 
 
601
 
 
602
class NonConflict(TestBase):
 
603
    """Two descendants insert compatible changes.
 
604
 
 
605
    No conflict should be reported."""
 
606
    def runTest(self):
 
607
        return  # NOT RUN
 
608
        k = Weave()
 
609
 
 
610
        k.add_lines([], ['aaa', 'bbb'])
 
611
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
 
612
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
 
613
 
 
614
 
 
615
class Khayyam(TestBase):
 
616
    """Test changes to multi-line texts, and read/write"""
 
617
 
 
618
    def test_multi_line_merge(self):
 
619
        rawtexts = [
 
620
            """A Book of Verses underneath the Bough,
 
621
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
622
            Beside me singing in the Wilderness --
 
623
            Oh, Wilderness were Paradise enow!""",
 
624
            
 
625
            """A Book of Verses underneath the Bough,
 
626
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
627
            Beside me singing in the Wilderness --
 
628
            Oh, Wilderness were Paradise now!""",
 
629
 
 
630
            """A Book of poems underneath the tree,
 
631
            A Jug of Wine, a Loaf of Bread,
 
632
            and Thou
 
633
            Beside me singing in the Wilderness --
 
634
            Oh, Wilderness were Paradise now!
 
635
 
 
636
            -- O. Khayyam""",
 
637
 
 
638
            """A Book of Verses underneath the Bough,
 
639
            A Jug of Wine, a Loaf of Bread,
 
640
            and Thou
 
641
            Beside me singing in the Wilderness --
 
642
            Oh, Wilderness were Paradise now!""",
 
643
            ]
 
644
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
645
 
 
646
        k = Weave()
 
647
        parents = set()
 
648
        i = 0
 
649
        for t in texts:
 
650
            ver = k.add_lines('text%d' % i,
 
651
                        list(parents), t)
 
652
            parents.add('text%d' % i)
 
653
            i += 1
 
654
 
 
655
        self.log("k._weave=" + pformat(k._weave))
 
656
 
 
657
        for i, t in enumerate(texts):
 
658
            self.assertEqual(k.get_lines(i), t)
 
659
 
 
660
        self.check_read_write(k)
 
661
 
 
662
 
 
663
class JoinWeavesTests(TestBase):
 
664
    def setUp(self):
 
665
        super(JoinWeavesTests, self).setUp()
 
666
        self.weave1 = Weave()
 
667
        self.lines1 = ['hello\n']
 
668
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
669
        self.weave1.add_lines('v1', [], self.lines1)
 
670
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
671
        self.weave1.add_lines('v3', ['v2'], self.lines3)
 
672
        
 
673
    def test_join_empty(self):
 
674
        """Join two empty weaves."""
 
675
        eq = self.assertEqual
 
676
        w1 = Weave()
 
677
        w2 = Weave()
 
678
        w1.join(w2)
 
679
        eq(len(w1), 0)
 
680
        
 
681
    def test_join_empty_to_nonempty(self):
 
682
        """Join empty weave onto nonempty."""
 
683
        self.weave1.join(Weave())
 
684
        self.assertEqual(len(self.weave1), 3)
 
685
 
 
686
    def test_join_unrelated(self):
 
687
        """Join two weaves with no history in common."""
 
688
        wb = Weave()
 
689
        wb.add_lines('b1', [], ['line from b\n'])
 
690
        w1 = self.weave1
 
691
        w1.join(wb)
 
692
        eq = self.assertEqual
 
693
        eq(len(w1), 4)
 
694
        eq(sorted(w1.versions()),
 
695
           ['b1', 'v1', 'v2', 'v3'])
 
696
 
 
697
    def test_join_related(self):
 
698
        wa = self.weave1.copy()
 
699
        wb = self.weave1.copy()
 
700
        wa.add_lines('a1', ['v3'], ['hello\n', 'sweet\n', 'world\n'])
 
701
        wb.add_lines('b1', ['v3'], ['hello\n', 'pale blue\n', 'world\n'])
 
702
        eq = self.assertEquals
 
703
        eq(len(wa), 4)
 
704
        eq(len(wb), 4)
 
705
        wa.join(wb)
 
706
        eq(len(wa), 5)
 
707
        eq(wa.get_lines('b1'),
 
708
           ['hello\n', 'pale blue\n', 'world\n'])
 
709
 
 
710
    def test_join_text_disagreement(self):
 
711
        """Cannot join weaves with different texts for a version."""
 
712
        wa = Weave()
 
713
        wb = Weave()
 
714
        wa.add_lines('v1', [], ['hello\n'])
 
715
        wb.add_lines('v1', [], ['not\n', 'hello\n'])
 
716
        self.assertRaises(WeaveError,
 
717
                          wa.join, wb)
 
718
 
 
719
    def test_join_unordered(self):
 
720
        """Join weaves where indexes differ.
 
721
        
 
722
        The source weave contains a different version at index 0."""
 
723
        wa = self.weave1.copy()
 
724
        wb = Weave()
 
725
        wb.add_lines('x1', [], ['line from x1\n'])
 
726
        wb.add_lines('v1', [], ['hello\n'])
 
727
        wb.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
728
        wa.join(wb)
 
729
        eq = self.assertEquals
 
730
        eq(sorted(wa.versions()), ['v1', 'v2', 'v3', 'x1',])
 
731
        eq(wa.get_text('x1'), 'line from x1\n')
 
732
 
 
733
    def test_written_detection(self):
 
734
        # Test detection of weave file corruption.
 
735
        #
 
736
        # Make sure that we can detect if a weave file has
 
737
        # been corrupted. This doesn't test all forms of corruption,
 
738
        # but it at least helps verify the data you get, is what you want.
 
739
        from cStringIO import StringIO
 
740
 
 
741
        w = Weave()
 
742
        w.add_lines('v1', [], ['hello\n'])
 
743
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
744
 
 
745
        tmpf = StringIO()
 
746
        write_weave(w, tmpf)
 
747
 
 
748
        # Because we are corrupting, we need to make sure we have the exact text
 
749
        self.assertEquals('# bzr weave file v5\n'
 
750
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
751
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
752
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
753
                          tmpf.getvalue())
 
754
 
 
755
        # Change a single letter
 
756
        tmpf = StringIO('# bzr weave file v5\n'
 
757
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
758
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
759
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
760
 
 
761
        w = read_weave(tmpf)
 
762
 
 
763
        self.assertEqual('hello\n', w.get_text('v1'))
 
764
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
765
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
766
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
767
 
 
768
        # Change the sha checksum
 
769
        tmpf = StringIO('# bzr weave file v5\n'
 
770
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
771
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
772
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
773
 
 
774
        w = read_weave(tmpf)
 
775
 
 
776
        self.assertEqual('hello\n', w.get_text('v1'))
 
777
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
778
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
779
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
780
 
 
781
 
 
782
class InstrumentedWeave(Weave):
 
783
    """Keep track of how many times functions are called."""
 
784
    
 
785
    def __init__(self, weave_name=None):
 
786
        self._extract_count = 0
 
787
        Weave.__init__(self, weave_name=weave_name)
 
788
 
 
789
    def _extract(self, versions):
 
790
        self._extract_count += 1
 
791
        return Weave._extract(self, versions)
 
792
 
 
793
 
 
794
class JoinOptimization(TestCase):
 
795
    """Test that Weave.join() doesn't extract all texts, only what must be done."""
 
796
 
 
797
    def test_join(self):
 
798
        w1 = InstrumentedWeave()
 
799
        w2 = InstrumentedWeave()
 
800
 
 
801
        txt0 = ['a\n']
 
802
        txt1 = ['a\n', 'b\n']
 
803
        txt2 = ['a\n', 'c\n']
 
804
        txt3 = ['a\n', 'b\n', 'c\n']
 
805
 
 
806
        w1.add_lines('txt0', [], txt0) # extract 1a
 
807
        w2.add_lines('txt0', [], txt0) # extract 1b
 
808
        w1.add_lines('txt1', ['txt0'], txt1)# extract 2a
 
809
        w2.add_lines('txt2', ['txt0'], txt2)# extract 2b
 
810
        w1.join(w2) # extract 3a to add txt2 
 
811
        w2.join(w1) # extract 3b to add txt1 
 
812
 
 
813
        w1.add_lines('txt3', ['txt1', 'txt2'], txt3) # extract 4a 
 
814
        w2.add_lines('txt3', ['txt2', 'txt1'], txt3) # extract 4b
 
815
        # These secretly have inverted parents
 
816
 
 
817
        # This should not have to do any extractions
 
818
        w1.join(w2) # NO extract, texts already present with same parents
 
819
        w2.join(w1) # NO extract, texts already present with same parents
 
820
 
 
821
        self.assertEqual(4, w1._extract_count)
 
822
        self.assertEqual(4, w2._extract_count)
 
823
 
 
824
    def test_double_parent(self):
 
825
        # It should not be considered illegal to add
 
826
        # a revision with the same parent twice
 
827
        w1 = InstrumentedWeave()
 
828
        w2 = InstrumentedWeave()
 
829
 
 
830
        txt0 = ['a\n']
 
831
        txt1 = ['a\n', 'b\n']
 
832
        txt2 = ['a\n', 'c\n']
 
833
        txt3 = ['a\n', 'b\n', 'c\n']
 
834
 
 
835
        w1.add_lines('txt0', [], txt0)
 
836
        w2.add_lines('txt0', [], txt0)
 
837
        w1.add_lines('txt1', ['txt0'], txt1)
 
838
        w2.add_lines('txt1', ['txt0', 'txt0'], txt1)
 
839
        # Same text, effectively the same, because the
 
840
        # parent is only repeated
 
841
        w1.join(w2) # extract 3a to add txt2 
 
842
        w2.join(w1) # extract 3b to add txt1 
 
843
 
 
844
 
 
845
class TestNeedsReweave(TestCase):
 
846
    """Internal corner cases for when reweave is needed."""
 
847
 
 
848
    def test_compatible_parents(self):
 
849
        w1 = Weave('a')
 
850
        my_parents = set([1, 2, 3])
 
851
        # subsets are ok
 
852
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
 
853
        # same sets
 
854
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
855
        # same empty corner case
 
856
        self.assertTrue(w1._compatible_parents(set(), set()))
 
857
        # other cannot contain stuff my_parents does not
 
858
        self.assertFalse(w1._compatible_parents(set(), set([1])))
 
859
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
 
860
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
 
861
 
 
862
 
 
863
class TestWeaveFile(TestCaseInTempDir):
 
864
    
 
865
    def test_empty_file(self):
 
866
        f = open('empty.weave', 'wb+')
 
867
        try:
 
868
            self.assertRaises(errors.WeaveFormatError,
 
869
                              read_weave, f)
 
870
        finally:
 
871
            f.close()