~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_weave.py

  • Committer: Ian Clatworthy
  • Date: 2009-03-02 06:35:43 UTC
  • mto: (0.23.30 groupcompress_rabin)
  • mto: This revision was merged to the branch mainline in revision 4280.
  • Revision ID: ian.clatworthy@canonical.com-20090302063543-czciz25uppelwhv0
groupcompress.py code cleanups

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 Canonical Ltd
2
 
#
3
 
# This program is free software; you can redistribute it and/or modify
4
 
# it under the terms of the GNU General Public License as published by
5
 
# the Free Software Foundation; either version 2 of the License, or
6
 
# (at your option) any later version.
7
 
#
8
 
# This program is distributed in the hope that it will be useful,
9
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
# GNU General Public License for more details.
12
 
#
13
 
# You should have received a copy of the GNU General Public License
14
 
# along with this program; if not, write to the Free Software
15
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
 
 
17
 
 
18
 
# TODO: tests regarding version names
19
 
# TODO: rbc 20050108 test that join does not leave an inconsistent weave
20
 
#       if it fails.
21
 
 
22
 
"""test suite for weave algorithm"""
23
 
 
24
 
from pprint import pformat
25
 
 
26
 
from bzrlib import (
27
 
    errors,
28
 
    )
29
 
from bzrlib.osutils import sha_string
30
 
from bzrlib.tests import TestCase, TestCaseInTempDir
31
 
from bzrlib.weave import Weave, WeaveFormatError, WeaveError
32
 
from bzrlib.weavefile import write_weave, read_weave
33
 
 
34
 
 
35
 
# texts for use in testing
36
 
TEXT_0 = ["Hello world"]
37
 
TEXT_1 = ["Hello world",
38
 
          "A second line"]
39
 
 
40
 
 
41
 
class TestBase(TestCase):
42
 
 
43
 
    def check_read_write(self, k):
44
 
        """Check the weave k can be written & re-read."""
45
 
        from tempfile import TemporaryFile
46
 
        tf = TemporaryFile()
47
 
 
48
 
        write_weave(k, tf)
49
 
        tf.seek(0)
50
 
        k2 = read_weave(tf)
51
 
 
52
 
        if k != k2:
53
 
            tf.seek(0)
54
 
            self.log('serialized weave:')
55
 
            self.log(tf.read())
56
 
 
57
 
            self.log('')
58
 
            self.log('parents: %s' % (k._parents == k2._parents))
59
 
            self.log('         %r' % k._parents)
60
 
            self.log('         %r' % k2._parents)
61
 
            self.log('')
62
 
            self.fail('read/write check failed')
63
 
 
64
 
 
65
 
class WeaveContains(TestBase):
66
 
    """Weave __contains__ operator"""
67
 
    def runTest(self):
68
 
        k = Weave(get_scope=lambda:None)
69
 
        self.assertFalse('foo' in k)
70
 
        k.add_lines('foo', [], TEXT_1)
71
 
        self.assertTrue('foo' in k)
72
 
 
73
 
 
74
 
class Easy(TestBase):
75
 
    def runTest(self):
76
 
        k = Weave()
77
 
 
78
 
 
79
 
class AnnotateOne(TestBase):
80
 
    def runTest(self):
81
 
        k = Weave()
82
 
        k.add_lines('text0', [], TEXT_0)
83
 
        self.assertEqual(k.annotate('text0'),
84
 
                         [('text0', TEXT_0[0])])
85
 
 
86
 
 
87
 
class InvalidAdd(TestBase):
88
 
    """Try to use invalid version number during add."""
89
 
    def runTest(self):
90
 
        k = Weave()
91
 
 
92
 
        self.assertRaises(errors.RevisionNotPresent,
93
 
                          k.add_lines,
94
 
                          'text0',
95
 
                          ['69'],
96
 
                          ['new text!'])
97
 
 
98
 
 
99
 
class RepeatedAdd(TestBase):
100
 
    """Add the same version twice; harmless."""
101
 
 
102
 
    def test_duplicate_add(self):
103
 
        k = Weave()
104
 
        idx = k.add_lines('text0', [], TEXT_0)
105
 
        idx2 = k.add_lines('text0', [], TEXT_0)
106
 
        self.assertEqual(idx, idx2)
107
 
 
108
 
 
109
 
class InvalidRepeatedAdd(TestBase):
110
 
    def runTest(self):
111
 
        k = Weave()
112
 
        k.add_lines('basis', [], TEXT_0)
113
 
        idx = k.add_lines('text0', [], TEXT_0)
114
 
        self.assertRaises(errors.RevisionAlreadyPresent,
115
 
                          k.add_lines,
116
 
                          'text0',
117
 
                          [],
118
 
                          ['not the same text'])
119
 
        self.assertRaises(errors.RevisionAlreadyPresent,
120
 
                          k.add_lines,
121
 
                          'text0',
122
 
                          ['basis'],         # not the right parents
123
 
                          TEXT_0)
124
 
 
125
 
 
126
 
class InsertLines(TestBase):
127
 
    """Store a revision that adds one line to the original.
128
 
 
129
 
    Look at the annotations to make sure that the first line is matched
130
 
    and not stored repeatedly."""
131
 
    def runTest(self):
132
 
        k = Weave()
133
 
 
134
 
        k.add_lines('text0', [], ['line 1'])
135
 
        k.add_lines('text1', ['text0'], ['line 1', 'line 2'])
136
 
 
137
 
        self.assertEqual(k.annotate('text0'),
138
 
                         [('text0', 'line 1')])
139
 
 
140
 
        self.assertEqual(k.get_lines(1),
141
 
                         ['line 1',
142
 
                          'line 2'])
143
 
 
144
 
        self.assertEqual(k.annotate('text1'),
145
 
                         [('text0', 'line 1'),
146
 
                          ('text1', 'line 2')])
147
 
 
148
 
        k.add_lines('text2', ['text0'], ['line 1', 'diverged line'])
149
 
 
150
 
        self.assertEqual(k.annotate('text2'),
151
 
                         [('text0', 'line 1'),
152
 
                          ('text2', 'diverged line')])
153
 
 
154
 
        text3 = ['line 1', 'middle line', 'line 2']
155
 
        k.add_lines('text3',
156
 
              ['text0', 'text1'],
157
 
              text3)
158
 
 
159
 
        # self.log("changes to text3: " + pformat(list(k._delta(set([0, 1]), text3))))
160
 
 
161
 
        self.log("k._weave=" + pformat(k._weave))
162
 
 
163
 
        self.assertEqual(k.annotate('text3'),
164
 
                         [('text0', 'line 1'),
165
 
                          ('text3', 'middle line'),
166
 
                          ('text1', 'line 2')])
167
 
 
168
 
        # now multiple insertions at different places
169
 
        k.add_lines('text4',
170
 
              ['text0', 'text1', 'text3'],
171
 
              ['line 1', 'aaa', 'middle line', 'bbb', 'line 2', 'ccc'])
172
 
 
173
 
        self.assertEqual(k.annotate('text4'),
174
 
                         [('text0', 'line 1'),
175
 
                          ('text4', 'aaa'),
176
 
                          ('text3', 'middle line'),
177
 
                          ('text4', 'bbb'),
178
 
                          ('text1', 'line 2'),
179
 
                          ('text4', 'ccc')])
180
 
 
181
 
 
182
 
class DeleteLines(TestBase):
183
 
    """Deletion of lines from existing text.
184
 
 
185
 
    Try various texts all based on a common ancestor."""
186
 
    def runTest(self):
187
 
        k = Weave()
188
 
 
189
 
        base_text = ['one', 'two', 'three', 'four']
190
 
 
191
 
        k.add_lines('text0', [], base_text)
192
 
 
193
 
        texts = [['one', 'two', 'three'],
194
 
                 ['two', 'three', 'four'],
195
 
                 ['one', 'four'],
196
 
                 ['one', 'two', 'three', 'four'],
197
 
                 ]
198
 
 
199
 
        i = 1
200
 
        for t in texts:
201
 
            ver = k.add_lines('text%d' % i,
202
 
                        ['text0'], t)
203
 
            i += 1
204
 
 
205
 
        self.log('final weave:')
206
 
        self.log('k._weave=' + pformat(k._weave))
207
 
 
208
 
        for i in range(len(texts)):
209
 
            self.assertEqual(k.get_lines(i+1),
210
 
                             texts[i])
211
 
 
212
 
 
213
 
class SuicideDelete(TestBase):
214
 
    """Invalid weave which tries to add and delete simultaneously."""
215
 
    def runTest(self):
216
 
        k = Weave()
217
 
 
218
 
        k._parents = [(),
219
 
                ]
220
 
        k._weave = [('{', 0),
221
 
                'first line',
222
 
                ('[', 0),
223
 
                'deleted in 0',
224
 
                (']', 0),
225
 
                ('}', 0),
226
 
                ]
227
 
        ################################### SKIPPED
228
 
        # Weave.get doesn't trap this anymore
229
 
        return
230
 
 
231
 
        self.assertRaises(WeaveFormatError,
232
 
                          k.get_lines,
233
 
                          0)
234
 
 
235
 
 
236
 
class CannedDelete(TestBase):
237
 
    """Unpack canned weave with deleted lines."""
238
 
    def runTest(self):
239
 
        k = Weave()
240
 
 
241
 
        k._parents = [(),
242
 
                frozenset([0]),
243
 
                ]
244
 
        k._weave = [('{', 0),
245
 
                'first line',
246
 
                ('[', 1),
247
 
                'line to be deleted',
248
 
                (']', 1),
249
 
                'last line',
250
 
                ('}', 0),
251
 
                ]
252
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
253
 
                  , sha_string('first linelast line')]
254
 
 
255
 
        self.assertEqual(k.get_lines(0),
256
 
                         ['first line',
257
 
                          'line to be deleted',
258
 
                          'last line',
259
 
                          ])
260
 
 
261
 
        self.assertEqual(k.get_lines(1),
262
 
                         ['first line',
263
 
                          'last line',
264
 
                          ])
265
 
 
266
 
 
267
 
class CannedReplacement(TestBase):
268
 
    """Unpack canned weave with deleted lines."""
269
 
    def runTest(self):
270
 
        k = Weave()
271
 
 
272
 
        k._parents = [frozenset(),
273
 
                frozenset([0]),
274
 
                ]
275
 
        k._weave = [('{', 0),
276
 
                'first line',
277
 
                ('[', 1),
278
 
                'line to be deleted',
279
 
                (']', 1),
280
 
                ('{', 1),
281
 
                'replacement line',
282
 
                ('}', 1),
283
 
                'last line',
284
 
                ('}', 0),
285
 
                ]
286
 
        k._sha1s = [sha_string('first lineline to be deletedlast line')
287
 
                  , sha_string('first linereplacement linelast line')]
288
 
 
289
 
        self.assertEqual(k.get_lines(0),
290
 
                         ['first line',
291
 
                          'line to be deleted',
292
 
                          'last line',
293
 
                          ])
294
 
 
295
 
        self.assertEqual(k.get_lines(1),
296
 
                         ['first line',
297
 
                          'replacement line',
298
 
                          'last line',
299
 
                          ])
300
 
 
301
 
 
302
 
class BadWeave(TestBase):
303
 
    """Test that we trap an insert which should not occur."""
304
 
    def runTest(self):
305
 
        k = Weave()
306
 
 
307
 
        k._parents = [frozenset(),
308
 
                ]
309
 
        k._weave = ['bad line',
310
 
                ('{', 0),
311
 
                'foo {',
312
 
                ('{', 1),
313
 
                '  added in version 1',
314
 
                ('{', 2),
315
 
                '  added in v2',
316
 
                ('}', 2),
317
 
                '  also from v1',
318
 
                ('}', 1),
319
 
                '}',
320
 
                ('}', 0)]
321
 
 
322
 
        ################################### SKIPPED
323
 
        # Weave.get doesn't trap this anymore
324
 
        return
325
 
 
326
 
 
327
 
        self.assertRaises(WeaveFormatError,
328
 
                          k.get,
329
 
                          0)
330
 
 
331
 
 
332
 
class BadInsert(TestBase):
333
 
    """Test that we trap an insert which should not occur."""
334
 
    def runTest(self):
335
 
        k = Weave()
336
 
 
337
 
        k._parents = [frozenset(),
338
 
                frozenset([0]),
339
 
                frozenset([0]),
340
 
                frozenset([0,1,2]),
341
 
                ]
342
 
        k._weave = [('{', 0),
343
 
                'foo {',
344
 
                ('{', 1),
345
 
                '  added in version 1',
346
 
                ('{', 1),
347
 
                '  more in 1',
348
 
                ('}', 1),
349
 
                ('}', 1),
350
 
                ('}', 0)]
351
 
 
352
 
 
353
 
        # this is not currently enforced by get
354
 
        return  ##########################################
355
 
 
356
 
        self.assertRaises(WeaveFormatError,
357
 
                          k.get,
358
 
                          0)
359
 
 
360
 
        self.assertRaises(WeaveFormatError,
361
 
                          k.get,
362
 
                          1)
363
 
 
364
 
 
365
 
class InsertNested(TestBase):
366
 
    """Insertion with nested instructions."""
367
 
    def runTest(self):
368
 
        k = Weave()
369
 
 
370
 
        k._parents = [frozenset(),
371
 
                frozenset([0]),
372
 
                frozenset([0]),
373
 
                frozenset([0,1,2]),
374
 
                ]
375
 
        k._weave = [('{', 0),
376
 
                'foo {',
377
 
                ('{', 1),
378
 
                '  added in version 1',
379
 
                ('{', 2),
380
 
                '  added in v2',
381
 
                ('}', 2),
382
 
                '  also from v1',
383
 
                ('}', 1),
384
 
                '}',
385
 
                ('}', 0)]
386
 
 
387
 
        k._sha1s = [sha_string('foo {}')
388
 
                  , sha_string('foo {  added in version 1  also from v1}')
389
 
                  , sha_string('foo {  added in v2}')
390
 
                  , sha_string('foo {  added in version 1  added in v2  also from v1}')
391
 
                  ]
392
 
 
393
 
        self.assertEqual(k.get_lines(0),
394
 
                         ['foo {',
395
 
                          '}'])
396
 
 
397
 
        self.assertEqual(k.get_lines(1),
398
 
                         ['foo {',
399
 
                          '  added in version 1',
400
 
                          '  also from v1',
401
 
                          '}'])
402
 
 
403
 
        self.assertEqual(k.get_lines(2),
404
 
                         ['foo {',
405
 
                          '  added in v2',
406
 
                          '}'])
407
 
 
408
 
        self.assertEqual(k.get_lines(3),
409
 
                         ['foo {',
410
 
                          '  added in version 1',
411
 
                          '  added in v2',
412
 
                          '  also from v1',
413
 
                          '}'])
414
 
 
415
 
 
416
 
class DeleteLines2(TestBase):
417
 
    """Test recording revisions that delete lines.
418
 
 
419
 
    This relies on the weave having a way to represent lines knocked
420
 
    out by a later revision."""
421
 
    def runTest(self):
422
 
        k = Weave()
423
 
 
424
 
        k.add_lines('text0', [], ["line the first",
425
 
                   "line 2",
426
 
                   "line 3",
427
 
                   "fine"])
428
 
 
429
 
        self.assertEqual(len(k.get_lines(0)), 4)
430
 
 
431
 
        k.add_lines('text1', ['text0'], ["line the first",
432
 
                   "fine"])
433
 
 
434
 
        self.assertEqual(k.get_lines(1),
435
 
                         ["line the first",
436
 
                          "fine"])
437
 
 
438
 
        self.assertEqual(k.annotate('text1'),
439
 
                         [('text0', "line the first"),
440
 
                          ('text0', "fine")])
441
 
 
442
 
 
443
 
class IncludeVersions(TestBase):
444
 
    """Check texts that are stored across multiple revisions.
445
 
 
446
 
    Here we manually create a weave with particular encoding and make
447
 
    sure it unpacks properly.
448
 
 
449
 
    Text 0 includes nothing; text 1 includes text 0 and adds some
450
 
    lines.
451
 
    """
452
 
 
453
 
    def runTest(self):
454
 
        k = Weave()
455
 
 
456
 
        k._parents = [frozenset(), frozenset([0])]
457
 
        k._weave = [('{', 0),
458
 
                "first line",
459
 
                ('}', 0),
460
 
                ('{', 1),
461
 
                "second line",
462
 
                ('}', 1)]
463
 
 
464
 
        k._sha1s = [sha_string('first line')
465
 
                  , sha_string('first linesecond line')]
466
 
 
467
 
        self.assertEqual(k.get_lines(1),
468
 
                         ["first line",
469
 
                          "second line"])
470
 
 
471
 
        self.assertEqual(k.get_lines(0),
472
 
                         ["first line"])
473
 
 
474
 
 
475
 
class DivergedIncludes(TestBase):
476
 
    """Weave with two diverged texts based on version 0.
477
 
    """
478
 
    def runTest(self):
479
 
        # FIXME make the weave, dont poke at it.
480
 
        k = Weave()
481
 
 
482
 
        k._names = ['0', '1', '2']
483
 
        k._name_map = {'0':0, '1':1, '2':2}
484
 
        k._parents = [frozenset(),
485
 
                frozenset([0]),
486
 
                frozenset([0]),
487
 
                ]
488
 
        k._weave = [('{', 0),
489
 
                "first line",
490
 
                ('}', 0),
491
 
                ('{', 1),
492
 
                "second line",
493
 
                ('}', 1),
494
 
                ('{', 2),
495
 
                "alternative second line",
496
 
                ('}', 2),
497
 
                ]
498
 
 
499
 
        k._sha1s = [sha_string('first line')
500
 
                  , sha_string('first linesecond line')
501
 
                  , sha_string('first linealternative second line')]
502
 
 
503
 
        self.assertEqual(k.get_lines(0),
504
 
                         ["first line"])
505
 
 
506
 
        self.assertEqual(k.get_lines(1),
507
 
                         ["first line",
508
 
                          "second line"])
509
 
 
510
 
        self.assertEqual(k.get_lines('2'),
511
 
                         ["first line",
512
 
                          "alternative second line"])
513
 
 
514
 
        self.assertEqual(list(k.get_ancestry(['2'])),
515
 
                         ['0', '2'])
516
 
 
517
 
 
518
 
class ReplaceLine(TestBase):
519
 
    def runTest(self):
520
 
        k = Weave()
521
 
 
522
 
        text0 = ['cheddar', 'stilton', 'gruyere']
523
 
        text1 = ['cheddar', 'blue vein', 'neufchatel', 'chevre']
524
 
 
525
 
        k.add_lines('text0', [], text0)
526
 
        k.add_lines('text1', ['text0'], text1)
527
 
 
528
 
        self.log('k._weave=' + pformat(k._weave))
529
 
 
530
 
        self.assertEqual(k.get_lines(0), text0)
531
 
        self.assertEqual(k.get_lines(1), text1)
532
 
 
533
 
 
534
 
class Merge(TestBase):
535
 
    """Storage of versions that merge diverged parents"""
536
 
    def runTest(self):
537
 
        k = Weave()
538
 
 
539
 
        texts = [['header'],
540
 
                 ['header', '', 'line from 1'],
541
 
                 ['header', '', 'line from 2', 'more from 2'],
542
 
                 ['header', '', 'line from 1', 'fixup line', 'line from 2'],
543
 
                 ]
544
 
 
545
 
        k.add_lines('text0', [], texts[0])
546
 
        k.add_lines('text1', ['text0'], texts[1])
547
 
        k.add_lines('text2', ['text0'], texts[2])
548
 
        k.add_lines('merge', ['text0', 'text1', 'text2'], texts[3])
549
 
 
550
 
        for i, t in enumerate(texts):
551
 
            self.assertEqual(k.get_lines(i), t)
552
 
 
553
 
        self.assertEqual(k.annotate('merge'),
554
 
                         [('text0', 'header'),
555
 
                          ('text1', ''),
556
 
                          ('text1', 'line from 1'),
557
 
                          ('merge', 'fixup line'),
558
 
                          ('text2', 'line from 2'),
559
 
                          ])
560
 
 
561
 
        self.assertEqual(list(k.get_ancestry(['merge'])),
562
 
                         ['text0', 'text1', 'text2', 'merge'])
563
 
 
564
 
        self.log('k._weave=' + pformat(k._weave))
565
 
 
566
 
        self.check_read_write(k)
567
 
 
568
 
 
569
 
class Conflicts(TestBase):
570
 
    """Test detection of conflicting regions during a merge.
571
 
 
572
 
    A base version is inserted, then two descendents try to
573
 
    insert different lines in the same place.  These should be
574
 
    reported as a possible conflict and forwarded to the user."""
575
 
    def runTest(self):
576
 
        return  # NOT RUN
577
 
        k = Weave()
578
 
 
579
 
        k.add_lines([], ['aaa', 'bbb'])
580
 
        k.add_lines([0], ['aaa', '111', 'bbb'])
581
 
        k.add_lines([1], ['aaa', '222', 'bbb'])
582
 
 
583
 
        merged = k.merge([1, 2])
584
 
 
585
 
        self.assertEquals([[['aaa']],
586
 
                           [['111'], ['222']],
587
 
                           [['bbb']]])
588
 
 
589
 
 
590
 
class NonConflict(TestBase):
591
 
    """Two descendants insert compatible changes.
592
 
 
593
 
    No conflict should be reported."""
594
 
    def runTest(self):
595
 
        return  # NOT RUN
596
 
        k = Weave()
597
 
 
598
 
        k.add_lines([], ['aaa', 'bbb'])
599
 
        k.add_lines([0], ['111', 'aaa', 'ccc', 'bbb'])
600
 
        k.add_lines([1], ['aaa', 'ccc', 'bbb', '222'])
601
 
 
602
 
 
603
 
class Khayyam(TestBase):
604
 
    """Test changes to multi-line texts, and read/write"""
605
 
 
606
 
    def test_multi_line_merge(self):
607
 
        rawtexts = [
608
 
            """A Book of Verses underneath the Bough,
609
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
610
 
            Beside me singing in the Wilderness --
611
 
            Oh, Wilderness were Paradise enow!""",
612
 
 
613
 
            """A Book of Verses underneath the Bough,
614
 
            A Jug of Wine, a Loaf of Bread, -- and Thou
615
 
            Beside me singing in the Wilderness --
616
 
            Oh, Wilderness were Paradise now!""",
617
 
 
618
 
            """A Book of poems underneath the tree,
619
 
            A Jug of Wine, a Loaf of Bread,
620
 
            and Thou
621
 
            Beside me singing in the Wilderness --
622
 
            Oh, Wilderness were Paradise now!
623
 
 
624
 
            -- O. Khayyam""",
625
 
 
626
 
            """A Book of Verses underneath the Bough,
627
 
            A Jug of Wine, a Loaf of Bread,
628
 
            and Thou
629
 
            Beside me singing in the Wilderness --
630
 
            Oh, Wilderness were Paradise now!""",
631
 
            ]
632
 
        texts = [[l.strip() for l in t.split('\n')] for t in rawtexts]
633
 
 
634
 
        k = Weave()
635
 
        parents = set()
636
 
        i = 0
637
 
        for t in texts:
638
 
            ver = k.add_lines('text%d' % i,
639
 
                        list(parents), t)
640
 
            parents.add('text%d' % i)
641
 
            i += 1
642
 
 
643
 
        self.log("k._weave=" + pformat(k._weave))
644
 
 
645
 
        for i, t in enumerate(texts):
646
 
            self.assertEqual(k.get_lines(i), t)
647
 
 
648
 
        self.check_read_write(k)
649
 
 
650
 
 
651
 
class JoinWeavesTests(TestBase):
652
 
    def setUp(self):
653
 
        super(JoinWeavesTests, self).setUp()
654
 
        self.weave1 = Weave()
655
 
        self.lines1 = ['hello\n']
656
 
        self.lines3 = ['hello\n', 'cruel\n', 'world\n']
657
 
        self.weave1.add_lines('v1', [], self.lines1)
658
 
        self.weave1.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
659
 
        self.weave1.add_lines('v3', ['v2'], self.lines3)
660
 
 
661
 
    def test_written_detection(self):
662
 
        # Test detection of weave file corruption.
663
 
        #
664
 
        # Make sure that we can detect if a weave file has
665
 
        # been corrupted. This doesn't test all forms of corruption,
666
 
        # but it at least helps verify the data you get, is what you want.
667
 
        from cStringIO import StringIO
668
 
 
669
 
        w = Weave()
670
 
        w.add_lines('v1', [], ['hello\n'])
671
 
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
672
 
 
673
 
        tmpf = StringIO()
674
 
        write_weave(w, tmpf)
675
 
 
676
 
        # Because we are corrupting, we need to make sure we have the exact text
677
 
        self.assertEquals('# bzr weave file v5\n'
678
 
                          'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
679
 
                          'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
680
 
                          'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n',
681
 
                          tmpf.getvalue())
682
 
 
683
 
        # Change a single letter
684
 
        tmpf = StringIO('# bzr weave file v5\n'
685
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
686
 
                        'i 0\n1 90f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
687
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. There\n}\nW\n')
688
 
 
689
 
        w = read_weave(tmpf)
690
 
 
691
 
        self.assertEqual('hello\n', w.get_text('v1'))
692
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
693
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
694
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
695
 
 
696
 
        # Change the sha checksum
697
 
        tmpf = StringIO('# bzr weave file v5\n'
698
 
                        'i\n1 f572d396fae9206628714fb2ce00f72e94f2258f\nn v1\n\n'
699
 
                        'i 0\n1 f0f265c6e75f1c8f9ab76dcf85528352c5f215ef\nn v2\n\n'
700
 
                        'w\n{ 0\n. hello\n}\n{ 1\n. there\n}\nW\n')
701
 
 
702
 
        w = read_weave(tmpf)
703
 
 
704
 
        self.assertEqual('hello\n', w.get_text('v1'))
705
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
706
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
707
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
708
 
 
709
 
 
710
 
class TestWeave(TestCase):
711
 
 
712
 
    def test_allow_reserved_false(self):
713
 
        w = Weave('name', allow_reserved=False)
714
 
        # Add lines is checked at the WeaveFile level, not at the Weave level
715
 
        w.add_lines('name:', [], TEXT_1)
716
 
        # But get_lines is checked at this level
717
 
        self.assertRaises(errors.ReservedId, w.get_lines, 'name:')
718
 
 
719
 
    def test_allow_reserved_true(self):
720
 
        w = Weave('name', allow_reserved=True)
721
 
        w.add_lines('name:', [], TEXT_1)
722
 
        self.assertEqual(TEXT_1, w.get_lines('name:'))
723
 
 
724
 
 
725
 
class InstrumentedWeave(Weave):
726
 
    """Keep track of how many times functions are called."""
727
 
 
728
 
    def __init__(self, weave_name=None):
729
 
        self._extract_count = 0
730
 
        Weave.__init__(self, weave_name=weave_name)
731
 
 
732
 
    def _extract(self, versions):
733
 
        self._extract_count += 1
734
 
        return Weave._extract(self, versions)
735
 
 
736
 
 
737
 
class TestNeedsReweave(TestCase):
738
 
    """Internal corner cases for when reweave is needed."""
739
 
 
740
 
    def test_compatible_parents(self):
741
 
        w1 = Weave('a')
742
 
        my_parents = set([1, 2, 3])
743
 
        # subsets are ok
744
 
        self.assertTrue(w1._compatible_parents(my_parents, set([3])))
745
 
        # same sets
746
 
        self.assertTrue(w1._compatible_parents(my_parents, set(my_parents)))
747
 
        # same empty corner case
748
 
        self.assertTrue(w1._compatible_parents(set(), set()))
749
 
        # other cannot contain stuff my_parents does not
750
 
        self.assertFalse(w1._compatible_parents(set(), set([1])))
751
 
        self.assertFalse(w1._compatible_parents(my_parents, set([1, 2, 3, 4])))
752
 
        self.assertFalse(w1._compatible_parents(my_parents, set([4])))
753
 
 
754
 
 
755
 
class TestWeaveFile(TestCaseInTempDir):
756
 
 
757
 
    def test_empty_file(self):
758
 
        f = open('empty.weave', 'wb+')
759
 
        try:
760
 
            self.assertRaises(errors.WeaveFormatError,
761
 
                              read_weave, f)
762
 
        finally:
763
 
            f.close()