~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Jelmer Vernooij
  • Date: 2011-11-30 20:02:16 UTC
  • mto: This revision was merged to the branch mainline in revision 6333.
  • Revision ID: jelmer@samba.org-20111130200216-aoju21pdl20d1gkd
Consistently pass tree path when exporting.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005-2009, 2011 Canonical Ltd
 
2
#
 
3
# This program is free software; you can redistribute it and/or modify
 
4
# it under the terms of the GNU General Public License as published by
 
5
# the Free Software Foundation; either version 2 of the License, or
 
6
# (at your option) any later version.
 
7
#
 
8
# This program is distributed in the hope that it will be useful,
 
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
# GNU General Public License for more details.
 
12
#
 
13
# You should have received a copy of the GNU General Public License
 
14
# along with this program; if not, write to the Free Software
 
15
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
 
16
 
 
17
 
 
18
# TODO: tests regarding version names
 
19
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
 
20
#       if it fails.
 
21
 
 
22
"""test suite for weave algorithm"""
 
23
 
 
24
from pprint import pformat
 
25
 
 
26
from bzrlib import (
 
27
    errors,
 
28
    )
 
29
from bzrlib.osutils import sha_string
 
30
from bzrlib.tests import TestCase, TestCaseInTempDir
 
31
from bzrlib.weave import Weave, WeaveFormatError
 
32
from bzrlib.weavefile import write_weave, read_weave
 
33
 
 
34
 
 
35
# texts for use in testing
 
36
TEXT_0 = ["Hello world"]
 
37
TEXT_1 = ["Hello world",
 
38
          "A second line"]
 
39
 
 
40
 
 
41
class TestBase(TestCase):
 
42
 
 
43
    def check_read_write(self, k):
 
44
        """Check the weave k can be written & re-read."""
 
45
        from tempfile import TemporaryFile
 
46
        tf = TemporaryFile()
 
47
 
 
48
        write_weave(k, tf)
 
49
        tf.seek(0)
 
50
        k2 = read_weave(tf)
 
51
 
 
52
        if k != k2:
 
53
            tf.seek(0)
 
54
            self.log('serialized weave:')
 
55
            self.log(tf.read())
 
56
 
 
57
            self.log('')
 
58
            self.log('parents: %s' % (k._parents == k2._parents))
 
59
            self.log('         %r' % k._parents)
 
60
            self.log('         %r' % k2._parents)
 
61
            self.log('')
 
62
            self.fail('read/write check failed')
 
63
 
 
64
 
 
65
class WeaveContains(TestBase):
 
66
    """Weave __contains__ operator"""
 
67
 
 
68
    def runTest(self):
 
69
        k = Weave(get_scope=lambda:None)
 
70
        self.assertFalse('foo' in k)
 
71
        k.add_lines('foo', [], TEXT_1)
 
72
        self.assertTrue('foo' in k)
 
73
 
 
74
 
 
75
class Easy(TestBase):
 
76
 
 
77
    def runTest(self):
 
78
        k = Weave()
 
79
 
 
80
 
 
81
class AnnotateOne(TestBase):
 
82
 
 
83
    def runTest(self):
 
84
        k = Weave()
 
85
        k.add_lines('text0', [], TEXT_0)
 
86
        self.assertEqual(k.annotate('text0'),
 
87
                         [('text0', TEXT_0[0])])
 
88
 
 
89
 
 
90
class InvalidAdd(TestBase):
 
91
    """Try to use invalid version number during add."""
 
92
 
 
93
    def runTest(self):
 
94
        k = Weave()
 
95
 
 
96
        self.assertRaises(errors.RevisionNotPresent,
 
97
                          k.add_lines,
 
98
                          'text0',
 
99
                          ['69'],
 
100
                          ['new text!'])
 
101
 
 
102
 
 
103
class RepeatedAdd(TestBase):
 
104
    """Add the same version twice; harmless."""
 
105
 
 
106
    def test_duplicate_add(self):
 
107
        k = Weave()
 
108
        idx = k.add_lines('text0', [], TEXT_0)
 
109
        idx2 = k.add_lines('text0', [], TEXT_0)
 
110
        self.assertEqual(idx, idx2)
 
111
 
 
112
 
 
113
class InvalidRepeatedAdd(TestBase):
 
114
 
 
115
    def runTest(self):
 
116
        k = Weave()
 
117
        k.add_lines('basis', [], TEXT_0)
 
118
        idx = k.add_lines('text0', [], TEXT_0)
 
119
        self.assertRaises(errors.RevisionAlreadyPresent,
 
120
                          k.add_lines,
 
121
                          'text0',
 
122
                          [],
 
123
                          ['not the same text'])
 
124
        self.assertRaises(errors.RevisionAlreadyPresent,
 
125
                          k.add_lines,
 
126
                          'text0',
 
127
                          ['basis'],         # not the right parents
 
128
                          TEXT_0)
 
129
 
 
130
 
 
131
class InsertLines(TestBase):
 
132
    """Store a revision that adds one line to the original.
 
133
 
 
134
    Look at the annotations to make sure that the first line is matched
 
135
    and not stored repeatedly."""
 
136
    def runTest(self):
 
137
        k = Weave()
 
138
 
 
139
        k.add_lines('text0', [], ['line 1'])
 
140
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
 
141
 
 
142
        self.assertEqual(k.annotate('text0'),
 
143
                         [('text0', 'line 1')])
 
144
 
 
145
        self.assertEqual(k.get_lines(1),
 
146
                         ['line 1',
 
147
                          'line 2'])
 
148
 
 
149
        self.assertEqual(k.annotate('text1'),
 
150
                         [('text0', 'line 1'),
 
151
                          ('text1', 'line 2')])
 
152
 
 
153
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
 
154
 
 
155
        self.assertEqual(k.annotate('text2'),
 
156
                         [('text0', 'line 1'),
 
157
                          ('text2', 'diverged line')])
 
158
 
 
159
        text3 = ['line 1', 'middle line', 'line 2']
 
160
        k.add_lines('text3',
 
161
              ['text0', 'text1'],
 
162
              text3)
 
163
 
 
164
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
 
165
 
 
166
        self.log("k._weave=" + pformat(k._weave))
 
167
 
 
168
        self.assertEqual(k.annotate('text3'),
 
169
                         [('text0', 'line 1'),
 
170
                          ('text3', 'middle line'),
 
171
                          ('text1', 'line 2')])
 
172
 
 
173
        # now multiple insertions at different places
 
174
        k.add_lines('text4',
 
175
              ['text0', 'text1', 'text3'],
 
176
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
 
177
 
 
178
        self.assertEqual(k.annotate('text4'),
 
179
                         [('text0', 'line 1'),
 
180
                          ('text4', 'aaa'),
 
181
                          ('text3', 'middle line'),
 
182
                          ('text4', 'bbb'),
 
183
                          ('text1', 'line 2'),
 
184
                          ('text4', 'ccc')])
 
185
 
 
186
 
 
187
class DeleteLines(TestBase):
 
188
    """Deletion of lines from existing text.
 
189
 
 
190
    Try various texts all based on a common ancestor."""
 
191
    def runTest(self):
 
192
        k = Weave()
 
193
 
 
194
        base_text = ['one', 'two', 'three', 'four']
 
195
 
 
196
        k.add_lines('text0', [], base_text)
 
197
 
 
198
        texts = [['one', 'two', 'three'],
 
199
                 ['two', 'three', 'four'],
 
200
                 ['one', 'four'],
 
201
                 ['one', 'two', 'three', 'four'],
 
202
                 ]
 
203
 
 
204
        i = 1
 
205
        for t in texts:
 
206
            ver = k.add_lines('text%d' % i,
 
207
                        ['text0'], t)
 
208
            i += 1
 
209
 
 
210
        self.log('final weave:')
 
211
        self.log('k._weave=' + pformat(k._weave))
 
212
 
 
213
        for i in range(len(texts)):
 
214
            self.assertEqual(k.get_lines(i+1),
 
215
                             texts[i])
 
216
 
 
217
 
 
218
class SuicideDelete(TestBase):
 
219
    """Invalid weave which tries to add and delete simultaneously."""
 
220
    def runTest(self):
 
221
        k = Weave()
 
222
 
 
223
        k._parents = [(),
 
224
                ]
 
225
        k._weave = [('{', 0),
 
226
                'first line',
 
227
                ('[', 0),
 
228
                'deleted in 0',
 
229
                (']', 0),
 
230
                ('}', 0),
 
231
                ]
 
232
        ################################### SKIPPED
 
233
        # Weave.get doesn't trap this anymore
 
234
        return
 
235
 
 
236
        self.assertRaises(WeaveFormatError,
 
237
                          k.get_lines,
 
238
                          0)
 
239
 
 
240
 
 
241
class CannedDelete(TestBase):
 
242
    """Unpack canned weave with deleted lines."""
 
243
    def runTest(self):
 
244
        k = Weave()
 
245
 
 
246
        k._parents = [(),
 
247
                frozenset([0]),
 
248
                ]
 
249
        k._weave = [('{', 0),
 
250
                'first line',
 
251
                ('[', 1),
 
252
                'line to be deleted',
 
253
                (']', 1),
 
254
                'last line',
 
255
                ('}', 0),
 
256
                ]
 
257
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
258
                  , sha_string('first linelast line')]
 
259
 
 
260
        self.assertEqual(k.get_lines(0),
 
261
                         ['first line',
 
262
                          'line to be deleted',
 
263
                          'last line',
 
264
                          ])
 
265
 
 
266
        self.assertEqual(k.get_lines(1),
 
267
                         ['first line',
 
268
                          'last line',
 
269
                          ])
 
270
 
 
271
 
 
272
class CannedReplacement(TestBase):
 
273
    """Unpack canned weave with deleted lines."""
 
274
    def runTest(self):
 
275
        k = Weave()
 
276
 
 
277
        k._parents = [frozenset(),
 
278
                frozenset([0]),
 
279
                ]
 
280
        k._weave = [('{', 0),
 
281
                'first line',
 
282
                ('[', 1),
 
283
                'line to be deleted',
 
284
                (']', 1),
 
285
                ('{', 1),
 
286
                'replacement line',
 
287
                ('}', 1),
 
288
                'last line',
 
289
                ('}', 0),
 
290
                ]
 
291
        k._sha1s = [sha_string('first lineline to be deletedlast line')
 
292
                  , sha_string('first linereplacement linelast line')]
 
293
 
 
294
        self.assertEqual(k.get_lines(0),
 
295
                         ['first line',
 
296
                          'line to be deleted',
 
297
                          'last line',
 
298
                          ])
 
299
 
 
300
        self.assertEqual(k.get_lines(1),
 
301
                         ['first line',
 
302
                          'replacement line',
 
303
                          'last line',
 
304
                          ])
 
305
 
 
306
 
 
307
class BadWeave(TestBase):
 
308
    """Test that we trap an insert which should not occur."""
 
309
    def runTest(self):
 
310
        k = Weave()
 
311
 
 
312
        k._parents = [frozenset(),
 
313
                ]
 
314
        k._weave = ['bad line',
 
315
                ('{', 0),
 
316
                'foo {',
 
317
                ('{', 1),
 
318
                '  added in version 1',
 
319
                ('{', 2),
 
320
                '  added in v2',
 
321
                ('}', 2),
 
322
                '  also from v1',
 
323
                ('}', 1),
 
324
                '}',
 
325
                ('}', 0)]
 
326
 
 
327
        ################################### SKIPPED
 
328
        # Weave.get doesn't trap this anymore
 
329
        return
 
330
 
 
331
 
 
332
        self.assertRaises(WeaveFormatError,
 
333
                          k.get,
 
334
                          0)
 
335
 
 
336
 
 
337
class BadInsert(TestBase):
 
338
    """Test that we trap an insert which should not occur."""
 
339
    def runTest(self):
 
340
        k = Weave()
 
341
 
 
342
        k._parents = [frozenset(),
 
343
                frozenset([0]),
 
344
                frozenset([0]),
 
345
                frozenset([0,1,2]),
 
346
                ]
 
347
        k._weave = [('{', 0),
 
348
                'foo {',
 
349
                ('{', 1),
 
350
                '  added in version 1',
 
351
                ('{', 1),
 
352
                '  more in 1',
 
353
                ('}', 1),
 
354
                ('}', 1),
 
355
                ('}', 0)]
 
356
 
 
357
 
 
358
        # this is not currently enforced by get
 
359
        return  ##########################################
 
360
 
 
361
        self.assertRaises(WeaveFormatError,
 
362
                          k.get,
 
363
                          0)
 
364
 
 
365
        self.assertRaises(WeaveFormatError,
 
366
                          k.get,
 
367
                          1)
 
368
 
 
369
 
 
370
class InsertNested(TestBase):
 
371
    """Insertion with nested instructions."""
 
372
    def runTest(self):
 
373
        k = Weave()
 
374
 
 
375
        k._parents = [frozenset(),
 
376
                frozenset([0]),
 
377
                frozenset([0]),
 
378
                frozenset([0,1,2]),
 
379
                ]
 
380
        k._weave = [('{', 0),
 
381
                'foo {',
 
382
                ('{', 1),
 
383
                '  added in version 1',
 
384
                ('{', 2),
 
385
                '  added in v2',
 
386
                ('}', 2),
 
387
                '  also from v1',
 
388
                ('}', 1),
 
389
                '}',
 
390
                ('}', 0)]
 
391
 
 
392
        k._sha1s = [sha_string('foo {}')
 
393
                  , sha_string('foo {  added in version 1  also from v1}')
 
394
                  , sha_string('foo {  added in v2}')
 
395
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
 
396
                  ]
 
397
 
 
398
        self.assertEqual(k.get_lines(0),
 
399
                         ['foo {',
 
400
                          '}'])
 
401
 
 
402
        self.assertEqual(k.get_lines(1),
 
403
                         ['foo {',
 
404
                          '  added in version 1',
 
405
                          '  also from v1',
 
406
                          '}'])
 
407
 
 
408
        self.assertEqual(k.get_lines(2),
 
409
                         ['foo {',
 
410
                          '  added in v2',
 
411
                          '}'])
 
412
 
 
413
        self.assertEqual(k.get_lines(3),
 
414
                         ['foo {',
 
415
                          '  added in version 1',
 
416
                          '  added in v2',
 
417
                          '  also from v1',
 
418
                          '}'])
 
419
 
 
420
 
 
421
class DeleteLines2(TestBase):
 
422
    """Test recording revisions that delete lines.
 
423
 
 
424
    This relies on the weave having a way to represent lines knocked
 
425
    out by a later revision."""
 
426
    def runTest(self):
 
427
        k = Weave()
 
428
 
 
429
        k.add_lines('text0', [], ["line the first",
 
430
                   "line 2",
 
431
                   "line 3",
 
432
                   "fine"])
 
433
 
 
434
        self.assertEqual(len(k.get_lines(0)), 4)
 
435
 
 
436
        k.add_lines('text1', ['text0'], ["line the first",
 
437
                   "fine"])
 
438
 
 
439
        self.assertEqual(k.get_lines(1),
 
440
                         ["line the first",
 
441
                          "fine"])
 
442
 
 
443
        self.assertEqual(k.annotate('text1'),
 
444
                         [('text0', "line the first"),
 
445
                          ('text0', "fine")])
 
446
 
 
447
 
 
448
class IncludeVersions(TestBase):
 
449
    """Check texts that are stored across multiple revisions.
 
450
 
 
451
    Here we manually create a weave with particular encoding and make
 
452
    sure it unpacks properly.
 
453
 
 
454
    Text 0 includes nothing; text 1 includes text 0 and adds some
 
455
    lines.
 
456
    """
 
457
 
 
458
    def runTest(self):
 
459
        k = Weave()
 
460
 
 
461
        k._parents = [frozenset(), frozenset([0])]
 
462
        k._weave = [('{', 0),
 
463
                "first line",
 
464
                ('}', 0),
 
465
                ('{', 1),
 
466
                "second line",
 
467
                ('}', 1)]
 
468
 
 
469
        k._sha1s = [sha_string('first line')
 
470
                  , sha_string('first linesecond line')]
 
471
 
 
472
        self.assertEqual(k.get_lines(1),
 
473
                         ["first line",
 
474
                          "second line"])
 
475
 
 
476
        self.assertEqual(k.get_lines(0),
 
477
                         ["first line"])
 
478
 
 
479
 
 
480
class DivergedIncludes(TestBase):
 
481
    """Weave with two diverged texts based on version 0.
 
482
    """
 
483
    def runTest(self):
 
484
        # FIXME make the weave, dont poke at it.
 
485
        k = Weave()
 
486
 
 
487
        k._names = ['0', '1', '2']
 
488
        k._name_map = {'0':0, '1':1, '2':2}
 
489
        k._parents = [frozenset(),
 
490
                frozenset([0]),
 
491
                frozenset([0]),
 
492
                ]
 
493
        k._weave = [('{', 0),
 
494
                "first line",
 
495
                ('}', 0),
 
496
                ('{', 1),
 
497
                "second line",
 
498
                ('}', 1),
 
499
                ('{', 2),
 
500
                "alternative second line",
 
501
                ('}', 2),
 
502
                ]
 
503
 
 
504
        k._sha1s = [sha_string('first line')
 
505
                  , sha_string('first linesecond line')
 
506
                  , sha_string('first linealternative second line')]
 
507
 
 
508
        self.assertEqual(k.get_lines(0),
 
509
                         ["first line"])
 
510
 
 
511
        self.assertEqual(k.get_lines(1),
 
512
                         ["first line",
 
513
                          "second line"])
 
514
 
 
515
        self.assertEqual(k.get_lines('2'),
 
516
                         ["first line",
 
517
                          "alternative second line"])
 
518
 
 
519
        self.assertEqual(list(k.get_ancestry(['2'])),
 
520
                         ['0', '2'])
 
521
 
 
522
 
 
523
class ReplaceLine(TestBase):
 
524
    def runTest(self):
 
525
        k = Weave()
 
526
 
 
527
        text0 = ['cheddar', 'stilton', 'gruyere']
 
528
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
 
529
 
 
530
        k.add_lines('text0', [], text0)
 
531
        k.add_lines('text1', ['text0'], text1)
 
532
 
 
533
        self.log('k._weave=' + pformat(k._weave))
 
534
 
 
535
        self.assertEqual(k.get_lines(0), text0)
 
536
        self.assertEqual(k.get_lines(1), text1)
 
537
 
 
538
 
 
539
class Merge(TestBase):
 
540
    """Storage of versions that merge diverged parents"""
 
541
 
 
542
    def runTest(self):
 
543
        k = Weave()
 
544
 
 
545
        texts = [['header'],
 
546
                 ['header', '', 'line from 1'],
 
547
                 ['header', '', 'line from 2', 'more from 2'],
 
548
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
 
549
                 ]
 
550
 
 
551
        k.add_lines('text0', [], texts[0])
 
552
        k.add_lines('text1', ['text0'], texts[1])
 
553
        k.add_lines('text2', ['text0'], texts[2])
 
554
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
 
555
 
 
556
        for i, t in enumerate(texts):
 
557
            self.assertEqual(k.get_lines(i), t)
 
558
 
 
559
        self.assertEqual(k.annotate('merge'),
 
560
                         [('text0', 'header'),
 
561
                          ('text1', ''),
 
562
                          ('text1', 'line from 1'),
 
563
                          ('merge', 'fixup line'),
 
564
                          ('text2', 'line from 2'),
 
565
                          ])
 
566
 
 
567
        self.assertEqual(list(k.get_ancestry(['merge'])),
 
568
                         ['text0', 'text1', 'text2', 'merge'])
 
569
 
 
570
        self.log('k._weave=' + pformat(k._weave))
 
571
 
 
572
        self.check_read_write(k)
 
573
 
 
574
 
 
575
class Conflicts(TestBase):
 
576
    """Test detection of conflicting regions during a merge.
 
577
 
 
578
    A base version is inserted, then two descendents try to
 
579
    insert different lines in the same place.  These should be
 
580
    reported as a possible conflict and forwarded to the user."""
 
581
    def runTest(self):
 
582
        return  # NOT RUN
 
583
        k = Weave()
 
584
 
 
585
        k.add_lines([], ['aaa', 'bbb'])
 
586
        k.add_lines([0], ['aaa', '111', 'bbb'])
 
587
        k.add_lines([1], ['aaa', '222', 'bbb'])
 
588
 
 
589
        merged = k.merge([1, 2])
 
590
 
 
591
        self.assertEquals([[['aaa']],
 
592
                           [['111'], ['222']],
 
593
                           [['bbb']]])
 
594
 
 
595
 
 
596
class NonConflict(TestBase):
 
597
    """Two descendants insert compatible changes.
 
598
 
 
599
    No conflict should be reported."""
 
600
    def runTest(self):
 
601
        return  # NOT RUN
 
602
        k = Weave()
 
603
 
 
604
        k.add_lines([], ['aaa', 'bbb'])
 
605
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
 
606
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
 
607
 
 
608
 
 
609
class Khayyam(TestBase):
 
610
    """Test changes to multi-line texts, and read/write"""
 
611
 
 
612
    def test_multi_line_merge(self):
 
613
        rawtexts = [
 
614
            """A Book of Verses underneath the Bough,
 
615
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
616
            Beside me singing in the Wilderness --
 
617
            Oh, Wilderness were Paradise enow!""",
 
618
 
 
619
            """A Book of Verses underneath the Bough,
 
620
            A Jug of Wine, a Loaf of Bread, -- and Thou
 
621
            Beside me singing in the Wilderness --
 
622
            Oh, Wilderness were Paradise now!""",
 
623
 
 
624
            """A Book of poems underneath the tree,
 
625
            A Jug of Wine, a Loaf of Bread,
 
626
            and Thou
 
627
            Beside me singing in the Wilderness --
 
628
            Oh, Wilderness were Paradise now!
 
629
 
 
630
            -- O. Khayyam""",
 
631
 
 
632
            """A Book of Verses underneath the Bough,
 
633
            A Jug of Wine, a Loaf of Bread,
 
634
            and Thou
 
635
            Beside me singing in the Wilderness --
 
636
            Oh, Wilderness were Paradise now!""",
 
637
            ]
 
638
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
 
639
 
 
640
        k = Weave()
 
641
        parents = set()
 
642
        i = 0
 
643
        for t in texts:
 
644
            ver = k.add_lines('text%d' % i,
 
645
                        list(parents), t)
 
646
            parents.add('text%d' % i)
 
647
            i += 1
 
648
 
 
649
        self.log("k._weave=" + pformat(k._weave))
 
650
 
 
651
        for i, t in enumerate(texts):
 
652
            self.assertEqual(k.get_lines(i), t)
 
653
 
 
654
        self.check_read_write(k)
 
655
 
 
656
 
 
657
class JoinWeavesTests(TestBase):
 
658
 
 
659
    def setUp(self):
 
660
        super(JoinWeavesTests, self).setUp()
 
661
        self.weave1 = Weave()
 
662
        self.lines1 = ['hello\n']
 
663
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
 
664
        self.weave1.add_lines('v1', [], self.lines1)
 
665
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
666
        self.weave1.add_lines('v3', ['v2'], self.lines3)
 
667
 
 
668
    def test_written_detection(self):
 
669
        # Test detection of weave file corruption.
 
670
        #
 
671
        # Make sure that we can detect if a weave file has
 
672
        # been corrupted. This doesn't test all forms of corruption,
 
673
        # but it at least helps verify the data you get, is what you want.
 
674
        from cStringIO import StringIO
 
675
 
 
676
        w = Weave()
 
677
        w.add_lines('v1', [], ['hello\n'])
 
678
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
679
 
 
680
        tmpf = StringIO()
 
681
        write_weave(w, tmpf)
 
682
 
 
683
        # Because we are corrupting, we need to make sure we have the exact text
 
684
        self.assertEquals('# bzr weave file v5\n'
 
685
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
686
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
687
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
 
688
                          tmpf.getvalue())
 
689
 
 
690
        # Change a single letter
 
691
        tmpf = StringIO('# bzr weave file v5\n'
 
692
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
693
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
694
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
 
695
 
 
696
        w = read_weave(tmpf)
 
697
 
 
698
        self.assertEqual('hello\n', w.get_text('v1'))
 
699
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
700
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
701
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
702
 
 
703
        # Change the sha checksum
 
704
        tmpf = StringIO('# bzr weave file v5\n'
 
705
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
 
706
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
 
707
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
 
708
 
 
709
        w = read_weave(tmpf)
 
710
 
 
711
        self.assertEqual('hello\n', w.get_text('v1'))
 
712
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
713
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
714
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
715
 
 
716
 
 
717
class TestWeave(TestCase):
 
718
 
 
719
    def test_allow_reserved_false(self):
 
720
        w = Weave('name', allow_reserved=False)
 
721
        # Add lines is checked at the WeaveFile level, not at the Weave level
 
722
        w.add_lines('name:', [], TEXT_1)
 
723
        # But get_lines is checked at this level
 
724
        self.assertRaises(errors.ReservedId, w.get_lines, 'name:')
 
725
 
 
726
    def test_allow_reserved_true(self):
 
727
        w = Weave('name', allow_reserved=True)
 
728
        w.add_lines('name:', [], TEXT_1)
 
729
        self.assertEqual(TEXT_1, w.get_lines('name:'))
 
730
 
 
731
 
 
732
class InstrumentedWeave(Weave):
 
733
    """Keep track of how many times functions are called."""
 
734
 
 
735
    def __init__(self, weave_name=None):
 
736
        self._extract_count = 0
 
737
        Weave.__init__(self, weave_name=weave_name)
 
738
 
 
739
    def _extract(self, versions):
 
740
        self._extract_count += 1
 
741
        return Weave._extract(self, versions)
 
742
 
 
743
 
 
744
class TestNeedsReweave(TestCase):
 
745
    """Internal corner cases for when reweave is needed."""
 
746
 
 
747
    def test_compatible_parents(self):
 
748
        w1 = Weave('a')
 
749
        my_parents = set([1, 2, 3])
 
750
        # subsets are ok
 
751
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
 
752
        # same sets
 
753
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
 
754
        # same empty corner case
 
755
        self.assertTrue(w1._compatible_parents(set(), set()))
 
756
        # other cannot contain stuff my_parents does not
 
757
        self.assertFalse(w1._compatible_parents(set(), set([1])))
 
758
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
 
759
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
 
760
 
 
761
 
 
762
class TestWeaveFile(TestCaseInTempDir):
 
763
 
 
764
    def test_empty_file(self):
 
765
        f = open('empty.weave', 'wb+')
 
766
        try:
 
767
            self.assertRaises(errors.WeaveFormatError,
 
768
                              read_weave, f)
 
769
        finally:
 
770
            f.close()