~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

  • Committer: Ian Clatworthy
  • Date: 2007-09-10 06:27:40 UTC
  • mto: (2818.1.1 ianc-integration)
  • mto: This revision was merged to the branch mainline in revision 2819.
  • Revision ID: ian.clatworthy@internode.on.net-20070910062740-kkj4776w6snhf0vl
more win32 path friendliness

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
#
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
 
 
21
# TODO: might be nice to create a versionedfile with some type of corruption
 
22
# considered typical and check that it can be detected/corrected.
 
23
 
 
24
from StringIO import StringIO
 
25
 
 
26
import bzrlib
 
27
from bzrlib import (
 
28
    errors,
 
29
    osutils,
 
30
    progress,
 
31
    )
 
32
from bzrlib.errors import (
 
33
                           RevisionNotPresent, 
 
34
                           RevisionAlreadyPresent,
 
35
                           WeaveParentMismatch
 
36
                           )
 
37
from bzrlib.knit import (
 
38
    KnitVersionedFile,
 
39
    KnitAnnotateFactory,
 
40
    KnitPlainFactory,
 
41
    )
 
42
from bzrlib.tests import TestCaseWithMemoryTransport, TestSkipped
 
43
from bzrlib.tests.HTTPTestUtil import TestCaseWithWebserver
 
44
from bzrlib.trace import mutter
 
45
from bzrlib.transport import get_transport
 
46
from bzrlib.transport.memory import MemoryTransport
 
47
from bzrlib.tsort import topo_sort
 
48
import bzrlib.versionedfile as versionedfile
 
49
from bzrlib.weave import WeaveFile
 
50
from bzrlib.weavefile import read_weave, write_weave
 
51
 
 
52
 
 
53
class VersionedFileTestMixIn(object):
 
54
    """A mixin test class for testing VersionedFiles.
 
55
 
 
56
    This is not an adaptor-style test at this point because
 
57
    theres no dynamic substitution of versioned file implementations,
 
58
    they are strictly controlled by their owning repositories.
 
59
    """
 
60
 
 
61
    def test_add(self):
 
62
        f = self.get_file()
 
63
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
64
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
65
        def verify_file(f):
 
66
            versions = f.versions()
 
67
            self.assertTrue('r0' in versions)
 
68
            self.assertTrue('r1' in versions)
 
69
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
70
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
71
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
72
            self.assertEqual(2, len(f))
 
73
            self.assertEqual(2, f.num_versions())
 
74
    
 
75
            self.assertRaises(RevisionNotPresent,
 
76
                f.add_lines, 'r2', ['foo'], [])
 
77
            self.assertRaises(RevisionAlreadyPresent,
 
78
                f.add_lines, 'r1', [], [])
 
79
        verify_file(f)
 
80
        # this checks that reopen with create=True does not break anything.
 
81
        f = self.reopen_file(create=True)
 
82
        verify_file(f)
 
83
 
 
84
    def test_adds_with_parent_texts(self):
 
85
        f = self.get_file()
 
86
        parent_texts = {}
 
87
        _, _, parent_texts['r0'] = f.add_lines('r0', [], ['a\n', 'b\n'])
 
88
        try:
 
89
            _, _, parent_texts['r1'] = f.add_lines_with_ghosts('r1',
 
90
                ['r0', 'ghost'], ['b\n', 'c\n'], parent_texts=parent_texts)
 
91
        except NotImplementedError:
 
92
            # if the format doesn't support ghosts, just add normally.
 
93
            _, _, parent_texts['r1'] = f.add_lines('r1',
 
94
                ['r0'], ['b\n', 'c\n'], parent_texts=parent_texts)
 
95
        f.add_lines('r2', ['r1'], ['c\n', 'd\n'], parent_texts=parent_texts)
 
96
        self.assertNotEqual(None, parent_texts['r0'])
 
97
        self.assertNotEqual(None, parent_texts['r1'])
 
98
        def verify_file(f):
 
99
            versions = f.versions()
 
100
            self.assertTrue('r0' in versions)
 
101
            self.assertTrue('r1' in versions)
 
102
            self.assertTrue('r2' in versions)
 
103
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
104
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
105
            self.assertEquals(f.get_lines('r2'), ['c\n', 'd\n'])
 
106
            self.assertEqual(3, f.num_versions())
 
107
            origins = f.annotate('r1')
 
108
            self.assertEquals(origins[0][0], 'r0')
 
109
            self.assertEquals(origins[1][0], 'r1')
 
110
            origins = f.annotate('r2')
 
111
            self.assertEquals(origins[0][0], 'r1')
 
112
            self.assertEquals(origins[1][0], 'r2')
 
113
 
 
114
        verify_file(f)
 
115
        f = self.reopen_file()
 
116
        verify_file(f)
 
117
 
 
118
    def test_add_unicode_content(self):
 
119
        # unicode content is not permitted in versioned files. 
 
120
        # versioned files version sequences of bytes only.
 
121
        vf = self.get_file()
 
122
        self.assertRaises(errors.BzrBadParameterUnicode,
 
123
            vf.add_lines, 'a', [], ['a\n', u'b\n', 'c\n'])
 
124
        self.assertRaises(
 
125
            (errors.BzrBadParameterUnicode, NotImplementedError),
 
126
            vf.add_lines_with_ghosts, 'a', [], ['a\n', u'b\n', 'c\n'])
 
127
 
 
128
    def test_add_follows_left_matching_blocks(self):
 
129
        """If we change left_matching_blocks, delta changes
 
130
 
 
131
        Note: There are multiple correct deltas in this case, because
 
132
        we start with 1 "a" and we get 3.
 
133
        """
 
134
        vf = self.get_file()
 
135
        if isinstance(vf, WeaveFile):
 
136
            raise TestSkipped("WeaveFile ignores left_matching_blocks")
 
137
        vf.add_lines('1', [], ['a\n'])
 
138
        vf.add_lines('2', ['1'], ['a\n', 'a\n', 'a\n'],
 
139
                     left_matching_blocks=[(0, 0, 1), (1, 3, 0)])
 
140
        self.assertEqual(['a\n', 'a\n', 'a\n'], vf.get_lines('2'))
 
141
        vf.add_lines('3', ['1'], ['a\n', 'a\n', 'a\n'],
 
142
                     left_matching_blocks=[(0, 2, 1), (1, 3, 0)])
 
143
        self.assertEqual(['a\n', 'a\n', 'a\n'], vf.get_lines('3'))
 
144
 
 
145
    def test_inline_newline_throws(self):
 
146
        # \r characters are not permitted in lines being added
 
147
        vf = self.get_file()
 
148
        self.assertRaises(errors.BzrBadParameterContainsNewline, 
 
149
            vf.add_lines, 'a', [], ['a\n\n'])
 
150
        self.assertRaises(
 
151
            (errors.BzrBadParameterContainsNewline, NotImplementedError),
 
152
            vf.add_lines_with_ghosts, 'a', [], ['a\n\n'])
 
153
        # but inline CR's are allowed
 
154
        vf.add_lines('a', [], ['a\r\n'])
 
155
        try:
 
156
            vf.add_lines_with_ghosts('b', [], ['a\r\n'])
 
157
        except NotImplementedError:
 
158
            pass
 
159
 
 
160
    def test_add_reserved(self):
 
161
        vf = self.get_file()
 
162
        self.assertRaises(errors.ReservedId,
 
163
            vf.add_lines, 'a:', [], ['a\n', 'b\n', 'c\n'])
 
164
 
 
165
    def test_add_lines_nostoresha(self):
 
166
        """When nostore_sha is supplied using old content raises."""
 
167
        vf = self.get_file()
 
168
        empty_text = ('a', [])
 
169
        sample_text_nl = ('b', ["foo\n", "bar\n"])
 
170
        sample_text_no_nl = ('c', ["foo\n", "bar"])
 
171
        shas = []
 
172
        for version, lines in (empty_text, sample_text_nl, sample_text_no_nl):
 
173
            sha, _, _ = vf.add_lines(version, [], lines)
 
174
            shas.append(sha)
 
175
        # we now have a copy of all the lines in the vf.
 
176
        for sha, (version, lines) in zip(
 
177
            shas, (empty_text, sample_text_nl, sample_text_no_nl)):
 
178
            self.assertRaises(errors.ExistingContent,
 
179
                vf.add_lines, version + "2", [], lines,
 
180
                nostore_sha=sha)
 
181
            # and no new version should have been added.
 
182
            self.assertRaises(errors.RevisionNotPresent, vf.get_lines,
 
183
                version + "2")
 
184
 
 
185
    def test_add_lines_with_ghosts_nostoresha(self):
 
186
        """When nostore_sha is supplied using old content raises."""
 
187
        vf = self.get_file()
 
188
        empty_text = ('a', [])
 
189
        sample_text_nl = ('b', ["foo\n", "bar\n"])
 
190
        sample_text_no_nl = ('c', ["foo\n", "bar"])
 
191
        shas = []
 
192
        for version, lines in (empty_text, sample_text_nl, sample_text_no_nl):
 
193
            sha, _, _ = vf.add_lines(version, [], lines)
 
194
            shas.append(sha)
 
195
        # we now have a copy of all the lines in the vf.
 
196
        # is the test applicable to this vf implementation?
 
197
        try:
 
198
            vf.add_lines_with_ghosts('d', [], [])
 
199
        except NotImplementedError:
 
200
            raise TestSkipped("add_lines_with_ghosts is optional")
 
201
        for sha, (version, lines) in zip(
 
202
            shas, (empty_text, sample_text_nl, sample_text_no_nl)):
 
203
            self.assertRaises(errors.ExistingContent,
 
204
                vf.add_lines_with_ghosts, version + "2", [], lines,
 
205
                nostore_sha=sha)
 
206
            # and no new version should have been added.
 
207
            self.assertRaises(errors.RevisionNotPresent, vf.get_lines,
 
208
                version + "2")
 
209
 
 
210
    def test_add_lines_return_value(self):
 
211
        # add_lines should return the sha1 and the text size.
 
212
        vf = self.get_file()
 
213
        empty_text = ('a', [])
 
214
        sample_text_nl = ('b', ["foo\n", "bar\n"])
 
215
        sample_text_no_nl = ('c', ["foo\n", "bar"])
 
216
        # check results for the three cases:
 
217
        for version, lines in (empty_text, sample_text_nl, sample_text_no_nl):
 
218
            # the first two elements are the same for all versioned files:
 
219
            # - the digest and the size of the text. For some versioned files
 
220
            #   additional data is returned in additional tuple elements.
 
221
            result = vf.add_lines(version, [], lines)
 
222
            self.assertEqual(3, len(result))
 
223
            self.assertEqual((osutils.sha_strings(lines), sum(map(len, lines))),
 
224
                result[0:2])
 
225
        # parents should not affect the result:
 
226
        lines = sample_text_nl[1]
 
227
        self.assertEqual((osutils.sha_strings(lines), sum(map(len, lines))),
 
228
            vf.add_lines('d', ['b', 'c'], lines)[0:2])
 
229
 
 
230
    def test_get_reserved(self):
 
231
        vf = self.get_file()
 
232
        self.assertRaises(errors.ReservedId, vf.get_texts, ['b:'])
 
233
        self.assertRaises(errors.ReservedId, vf.get_lines, 'b:')
 
234
        self.assertRaises(errors.ReservedId, vf.get_text, 'b:')
 
235
 
 
236
    def test_make_mpdiffs(self):
 
237
        from bzrlib import multiparent
 
238
        vf = self.get_file('foo')
 
239
        sha1s = self._setup_for_deltas(vf)
 
240
        new_vf = self.get_file('bar')
 
241
        for version in multiparent.topo_iter(vf):
 
242
            mpdiff = vf.make_mpdiffs([version])[0]
 
243
            new_vf.add_mpdiffs([(version, vf.get_parents(version),
 
244
                                 vf.get_sha1(version), mpdiff)])
 
245
            self.assertEqualDiff(vf.get_text(version),
 
246
                                 new_vf.get_text(version))
 
247
 
 
248
    def _setup_for_deltas(self, f):
 
249
        self.assertFalse(f.has_version('base'))
 
250
        # add texts that should trip the knit maximum delta chain threshold
 
251
        # as well as doing parallel chains of data in knits.
 
252
        # this is done by two chains of 25 insertions
 
253
        f.add_lines('base', [], ['line\n'])
 
254
        f.add_lines('noeol', ['base'], ['line'])
 
255
        # detailed eol tests:
 
256
        # shared last line with parent no-eol
 
257
        f.add_lines('noeolsecond', ['noeol'], ['line\n', 'line'])
 
258
        # differing last line with parent, both no-eol
 
259
        f.add_lines('noeolnotshared', ['noeolsecond'], ['line\n', 'phone'])
 
260
        # add eol following a noneol parent, change content
 
261
        f.add_lines('eol', ['noeol'], ['phone\n'])
 
262
        # add eol following a noneol parent, no change content
 
263
        f.add_lines('eolline', ['noeol'], ['line\n'])
 
264
        # noeol with no parents:
 
265
        f.add_lines('noeolbase', [], ['line'])
 
266
        # noeol preceeding its leftmost parent in the output:
 
267
        # this is done by making it a merge of two parents with no common
 
268
        # anestry: noeolbase and noeol with the 
 
269
        # later-inserted parent the leftmost.
 
270
        f.add_lines('eolbeforefirstparent', ['noeolbase', 'noeol'], ['line'])
 
271
        # two identical eol texts
 
272
        f.add_lines('noeoldup', ['noeol'], ['line'])
 
273
        next_parent = 'base'
 
274
        text_name = 'chain1-'
 
275
        text = ['line\n']
 
276
        sha1s = {0 :'da6d3141cb4a5e6f464bf6e0518042ddc7bfd079',
 
277
                 1 :'45e21ea146a81ea44a821737acdb4f9791c8abe7',
 
278
                 2 :'e1f11570edf3e2a070052366c582837a4fe4e9fa',
 
279
                 3 :'26b4b8626da827088c514b8f9bbe4ebf181edda1',
 
280
                 4 :'e28a5510be25ba84d31121cff00956f9970ae6f6',
 
281
                 5 :'d63ec0ce22e11dcf65a931b69255d3ac747a318d',
 
282
                 6 :'2c2888d288cb5e1d98009d822fedfe6019c6a4ea',
 
283
                 7 :'95c14da9cafbf828e3e74a6f016d87926ba234ab',
 
284
                 8 :'779e9a0b28f9f832528d4b21e17e168c67697272',
 
285
                 9 :'1f8ff4e5c6ff78ac106fcfe6b1e8cb8740ff9a8f',
 
286
                 10:'131a2ae712cf51ed62f143e3fbac3d4206c25a05',
 
287
                 11:'c5a9d6f520d2515e1ec401a8f8a67e6c3c89f199',
 
288
                 12:'31a2286267f24d8bedaa43355f8ad7129509ea85',
 
289
                 13:'dc2a7fe80e8ec5cae920973973a8ee28b2da5e0a',
 
290
                 14:'2c4b1736566b8ca6051e668de68650686a3922f2',
 
291
                 15:'5912e4ecd9b0c07be4d013e7e2bdcf9323276cde',
 
292
                 16:'b0d2e18d3559a00580f6b49804c23fea500feab3',
 
293
                 17:'8e1d43ad72f7562d7cb8f57ee584e20eb1a69fc7',
 
294
                 18:'5cf64a3459ae28efa60239e44b20312d25b253f3',
 
295
                 19:'1ebed371807ba5935958ad0884595126e8c4e823',
 
296
                 20:'2aa62a8b06fb3b3b892a3292a068ade69d5ee0d3',
 
297
                 21:'01edc447978004f6e4e962b417a4ae1955b6fe5d',
 
298
                 22:'d8d8dc49c4bf0bab401e0298bb5ad827768618bb',
 
299
                 23:'c21f62b1c482862983a8ffb2b0c64b3451876e3f',
 
300
                 24:'c0593fe795e00dff6b3c0fe857a074364d5f04fc',
 
301
                 25:'dd1a1cf2ba9cc225c3aff729953e6364bf1d1855',
 
302
                 }
 
303
        for depth in range(26):
 
304
            new_version = text_name + '%s' % depth
 
305
            text = text + ['line\n']
 
306
            f.add_lines(new_version, [next_parent], text)
 
307
            next_parent = new_version
 
308
        next_parent = 'base'
 
309
        text_name = 'chain2-'
 
310
        text = ['line\n']
 
311
        for depth in range(26):
 
312
            new_version = text_name + '%s' % depth
 
313
            text = text + ['line\n']
 
314
            f.add_lines(new_version, [next_parent], text)
 
315
            next_parent = new_version
 
316
        return sha1s
 
317
 
 
318
    def test_ancestry(self):
 
319
        f = self.get_file()
 
320
        self.assertEqual([], f.get_ancestry([]))
 
321
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
322
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
323
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
324
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
325
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
326
        self.assertEqual([], f.get_ancestry([]))
 
327
        versions = f.get_ancestry(['rM'])
 
328
        # there are some possibilities:
 
329
        # r0 r1 r2 rM r3
 
330
        # r0 r1 r2 r3 rM
 
331
        # etc
 
332
        # so we check indexes
 
333
        r0 = versions.index('r0')
 
334
        r1 = versions.index('r1')
 
335
        r2 = versions.index('r2')
 
336
        self.assertFalse('r3' in versions)
 
337
        rM = versions.index('rM')
 
338
        self.assertTrue(r0 < r1)
 
339
        self.assertTrue(r0 < r2)
 
340
        self.assertTrue(r1 < rM)
 
341
        self.assertTrue(r2 < rM)
 
342
 
 
343
        self.assertRaises(RevisionNotPresent,
 
344
            f.get_ancestry, ['rM', 'rX'])
 
345
 
 
346
        self.assertEqual(set(f.get_ancestry('rM')),
 
347
            set(f.get_ancestry('rM', topo_sorted=False)))
 
348
 
 
349
    def test_mutate_after_finish(self):
 
350
        f = self.get_file()
 
351
        f.transaction_finished()
 
352
        self.assertRaises(errors.OutSideTransaction, f.add_lines, '', [], [])
 
353
        self.assertRaises(errors.OutSideTransaction, f.add_lines_with_ghosts, '', [], [])
 
354
        self.assertRaises(errors.OutSideTransaction, f.fix_parents, '', [])
 
355
        self.assertRaises(errors.OutSideTransaction, f.join, '')
 
356
        self.assertRaises(errors.OutSideTransaction, f.clone_text, 'base', 'bar', ['foo'])
 
357
        
 
358
    def test_clear_cache(self):
 
359
        f = self.get_file()
 
360
        # on a new file it should not error
 
361
        f.clear_cache()
 
362
        # and after adding content, doing a clear_cache and a get should work.
 
363
        f.add_lines('0', [], ['a'])
 
364
        f.clear_cache()
 
365
        self.assertEqual(['a'], f.get_lines('0'))
 
366
 
 
367
    def test_clone_text(self):
 
368
        f = self.get_file()
 
369
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
370
        f.clone_text('r1', 'r0', ['r0'])
 
371
        def verify_file(f):
 
372
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
373
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
374
            self.assertEquals(f.get_parents('r1'), ['r0'])
 
375
    
 
376
            self.assertRaises(RevisionNotPresent,
 
377
                f.clone_text, 'r2', 'rX', [])
 
378
            self.assertRaises(RevisionAlreadyPresent,
 
379
                f.clone_text, 'r1', 'r0', [])
 
380
        verify_file(f)
 
381
        verify_file(self.reopen_file())
 
382
 
 
383
    def test_create_empty(self):
 
384
        f = self.get_file()
 
385
        f.add_lines('0', [], ['a\n'])
 
386
        new_f = f.create_empty('t', MemoryTransport())
 
387
        # smoke test, specific types should check it is honoured correctly for
 
388
        # non type attributes
 
389
        self.assertEqual([], new_f.versions())
 
390
        self.assertTrue(isinstance(new_f, f.__class__))
 
391
 
 
392
    def test_copy_to(self):
 
393
        f = self.get_file()
 
394
        f.add_lines('0', [], ['a\n'])
 
395
        t = MemoryTransport()
 
396
        f.copy_to('foo', t)
 
397
        for suffix in f.__class__.get_suffixes():
 
398
            self.assertTrue(t.has('foo' + suffix))
 
399
 
 
400
    def test_get_suffixes(self):
 
401
        f = self.get_file()
 
402
        # should be the same
 
403
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
 
404
        # and should be a list
 
405
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
 
406
 
 
407
    def build_graph(self, file, graph):
 
408
        for node in topo_sort(graph.items()):
 
409
            file.add_lines(node, graph[node], [])
 
410
 
 
411
    def test_get_graph(self):
 
412
        f = self.get_file()
 
413
        graph = {
 
414
            'v1': (),
 
415
            'v2': ('v1', ),
 
416
            'v3': ('v2', )}
 
417
        self.build_graph(f, graph)
 
418
        self.assertEqual(graph, f.get_graph())
 
419
    
 
420
    def test_get_graph_partial(self):
 
421
        f = self.get_file()
 
422
        complex_graph = {}
 
423
        simple_a = {
 
424
            'c': (),
 
425
            'b': ('c', ),
 
426
            'a': ('b', ),
 
427
            }
 
428
        complex_graph.update(simple_a)
 
429
        simple_b = {
 
430
            'c': (),
 
431
            'b': ('c', ),
 
432
            }
 
433
        complex_graph.update(simple_b)
 
434
        simple_gam = {
 
435
            'c': (),
 
436
            'oo': (),
 
437
            'bar': ('oo', 'c'),
 
438
            'gam': ('bar', ),
 
439
            }
 
440
        complex_graph.update(simple_gam)
 
441
        simple_b_gam = {}
 
442
        simple_b_gam.update(simple_gam)
 
443
        simple_b_gam.update(simple_b)
 
444
        self.build_graph(f, complex_graph)
 
445
        self.assertEqual(simple_a, f.get_graph(['a']))
 
446
        self.assertEqual(simple_b, f.get_graph(['b']))
 
447
        self.assertEqual(simple_gam, f.get_graph(['gam']))
 
448
        self.assertEqual(simple_b_gam, f.get_graph(['b', 'gam']))
 
449
 
 
450
    def test_get_parents(self):
 
451
        f = self.get_file()
 
452
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
453
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
454
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
455
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
456
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
457
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
458
 
 
459
        self.assertRaises(RevisionNotPresent,
 
460
            f.get_parents, 'y')
 
461
 
 
462
    def test_annotate(self):
 
463
        f = self.get_file()
 
464
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
465
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
466
        origins = f.annotate('r1')
 
467
        self.assertEquals(origins[0][0], 'r1')
 
468
        self.assertEquals(origins[1][0], 'r0')
 
469
 
 
470
        self.assertRaises(RevisionNotPresent,
 
471
            f.annotate, 'foo')
 
472
 
 
473
    def test_detection(self):
 
474
        # Test weaves detect corruption.
 
475
        #
 
476
        # Weaves contain a checksum of their texts.
 
477
        # When a text is extracted, this checksum should be
 
478
        # verified.
 
479
 
 
480
        w = self.get_file_corrupted_text()
 
481
 
 
482
        self.assertEqual('hello\n', w.get_text('v1'))
 
483
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
484
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
485
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
486
 
 
487
        w = self.get_file_corrupted_checksum()
 
488
 
 
489
        self.assertEqual('hello\n', w.get_text('v1'))
 
490
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
491
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
492
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
493
 
 
494
    def get_file_corrupted_text(self):
 
495
        """Return a versioned file with corrupt text but valid metadata."""
 
496
        raise NotImplementedError(self.get_file_corrupted_text)
 
497
 
 
498
    def reopen_file(self, name='foo'):
 
499
        """Open the versioned file from disk again."""
 
500
        raise NotImplementedError(self.reopen_file)
 
501
 
 
502
    def test_iter_parents(self):
 
503
        """iter_parents returns the parents for many nodes."""
 
504
        f = self.get_file()
 
505
        # sample data:
 
506
        # no parents
 
507
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
508
        # 1 parents
 
509
        f.add_lines('r1', ['r0'], ['a\n', 'b\n'])
 
510
        # 2 parents
 
511
        f.add_lines('r2', ['r1', 'r0'], ['a\n', 'b\n'])
 
512
        # XXX TODO a ghost
 
513
        # cases: each sample data individually:
 
514
        self.assertEqual(set([('r0', ())]),
 
515
            set(f.iter_parents(['r0'])))
 
516
        self.assertEqual(set([('r1', ('r0', ))]),
 
517
            set(f.iter_parents(['r1'])))
 
518
        self.assertEqual(set([('r2', ('r1', 'r0'))]),
 
519
            set(f.iter_parents(['r2'])))
 
520
        # no nodes returned for a missing node
 
521
        self.assertEqual(set(),
 
522
            set(f.iter_parents(['missing'])))
 
523
        # 1 node returned with missing nodes skipped
 
524
        self.assertEqual(set([('r1', ('r0', ))]),
 
525
            set(f.iter_parents(['ghost1', 'r1', 'ghost'])))
 
526
        # 2 nodes returned
 
527
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
528
            set(f.iter_parents(['r0', 'r1'])))
 
529
        # 2 nodes returned, missing skipped
 
530
        self.assertEqual(set([('r0', ()), ('r1', ('r0', ))]),
 
531
            set(f.iter_parents(['a', 'r0', 'b', 'r1', 'c'])))
 
532
 
 
533
    def test_iter_lines_added_or_present_in_versions(self):
 
534
        # test that we get at least an equalset of the lines added by
 
535
        # versions in the weave 
 
536
        # the ordering here is to make a tree so that dumb searches have
 
537
        # more changes to muck up.
 
538
 
 
539
        class InstrumentedProgress(progress.DummyProgress):
 
540
 
 
541
            def __init__(self):
 
542
 
 
543
                progress.DummyProgress.__init__(self)
 
544
                self.updates = []
 
545
 
 
546
            def update(self, msg=None, current=None, total=None):
 
547
                self.updates.append((msg, current, total))
 
548
 
 
549
        vf = self.get_file()
 
550
        # add a base to get included
 
551
        vf.add_lines('base', [], ['base\n'])
 
552
        # add a ancestor to be included on one side
 
553
        vf.add_lines('lancestor', [], ['lancestor\n'])
 
554
        # add a ancestor to be included on the other side
 
555
        vf.add_lines('rancestor', ['base'], ['rancestor\n'])
 
556
        # add a child of rancestor with no eofile-nl
 
557
        vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
 
558
        # add a child of lancestor and base to join the two roots
 
559
        vf.add_lines('otherchild',
 
560
                     ['lancestor', 'base'],
 
561
                     ['base\n', 'lancestor\n', 'otherchild\n'])
 
562
        def iter_with_versions(versions, expected):
 
563
            # now we need to see what lines are returned, and how often.
 
564
            lines = {'base\n':0,
 
565
                     'lancestor\n':0,
 
566
                     'rancestor\n':0,
 
567
                     'child\n':0,
 
568
                     'otherchild\n':0,
 
569
                     }
 
570
            progress = InstrumentedProgress()
 
571
            # iterate over the lines
 
572
            for line in vf.iter_lines_added_or_present_in_versions(versions, 
 
573
                pb=progress):
 
574
                lines[line] += 1
 
575
            if []!= progress.updates: 
 
576
                self.assertEqual(expected, progress.updates)
 
577
            return lines
 
578
        lines = iter_with_versions(['child', 'otherchild'],
 
579
                                   [('Walking content.', 0, 2),
 
580
                                    ('Walking content.', 1, 2),
 
581
                                    ('Walking content.', 2, 2)])
 
582
        # we must see child and otherchild
 
583
        self.assertTrue(lines['child\n'] > 0)
 
584
        self.assertTrue(lines['otherchild\n'] > 0)
 
585
        # we dont care if we got more than that.
 
586
        
 
587
        # test all lines
 
588
        lines = iter_with_versions(None, [('Walking content.', 0, 5),
 
589
                                          ('Walking content.', 1, 5),
 
590
                                          ('Walking content.', 2, 5),
 
591
                                          ('Walking content.', 3, 5),
 
592
                                          ('Walking content.', 4, 5),
 
593
                                          ('Walking content.', 5, 5)])
 
594
        # all lines must be seen at least once
 
595
        self.assertTrue(lines['base\n'] > 0)
 
596
        self.assertTrue(lines['lancestor\n'] > 0)
 
597
        self.assertTrue(lines['rancestor\n'] > 0)
 
598
        self.assertTrue(lines['child\n'] > 0)
 
599
        self.assertTrue(lines['otherchild\n'] > 0)
 
600
 
 
601
    def test_fix_parents(self):
 
602
        # some versioned files allow incorrect parents to be corrected after
 
603
        # insertion - this may not fix ancestry..
 
604
        # if they do not supported, they just do not implement it.
 
605
        # we test this as an interface test to ensure that those that *do*
 
606
        # implementent it get it right.
 
607
        vf = self.get_file()
 
608
        vf.add_lines('notbase', [], [])
 
609
        vf.add_lines('base', [], [])
 
610
        try:
 
611
            vf.fix_parents('notbase', ['base'])
 
612
        except NotImplementedError:
 
613
            return
 
614
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
615
        # open again, check it stuck.
 
616
        vf = self.get_file()
 
617
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
618
 
 
619
    def test_fix_parents_with_ghosts(self):
 
620
        # when fixing parents, ghosts that are listed should not be ghosts
 
621
        # anymore.
 
622
        vf = self.get_file()
 
623
 
 
624
        try:
 
625
            vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
 
626
        except NotImplementedError:
 
627
            return
 
628
        vf.add_lines('base', [], [])
 
629
        vf.fix_parents('notbase', ['base', 'stillghost'])
 
630
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
631
        # open again, check it stuck.
 
632
        vf = self.get_file()
 
633
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
634
        # and check the ghosts
 
635
        self.assertEqual(['base', 'stillghost'],
 
636
                         vf.get_parents_with_ghosts('notbase'))
 
637
 
 
638
    def test_add_lines_with_ghosts(self):
 
639
        # some versioned file formats allow lines to be added with parent
 
640
        # information that is > than that in the format. Formats that do
 
641
        # not support this need to raise NotImplementedError on the
 
642
        # add_lines_with_ghosts api.
 
643
        vf = self.get_file()
 
644
        # add a revision with ghost parents
 
645
        # The preferred form is utf8, but we should translate when needed
 
646
        parent_id_unicode = u'b\xbfse'
 
647
        parent_id_utf8 = parent_id_unicode.encode('utf8')
 
648
        try:
 
649
            vf.add_lines_with_ghosts('notbxbfse', [parent_id_utf8], [])
 
650
        except NotImplementedError:
 
651
            # check the other ghost apis are also not implemented
 
652
            self.assertRaises(NotImplementedError, vf.has_ghost, 'foo')
 
653
            self.assertRaises(NotImplementedError, vf.get_ancestry_with_ghosts, ['foo'])
 
654
            self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
 
655
            self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
 
656
            return
 
657
        vf = self.reopen_file()
 
658
        # test key graph related apis: getncestry, _graph, get_parents
 
659
        # has_version
 
660
        # - these are ghost unaware and must not be reflect ghosts
 
661
        self.assertEqual(['notbxbfse'], vf.get_ancestry('notbxbfse'))
 
662
        self.assertEqual([], vf.get_parents('notbxbfse'))
 
663
        self.assertEqual({'notbxbfse':()}, vf.get_graph())
 
664
        self.assertFalse(self.callDeprecated([osutils._revision_id_warning],
 
665
                         vf.has_version, parent_id_unicode))
 
666
        self.assertFalse(vf.has_version(parent_id_utf8))
 
667
        # we have _with_ghost apis to give us ghost information.
 
668
        self.assertEqual([parent_id_utf8, 'notbxbfse'], vf.get_ancestry_with_ghosts(['notbxbfse']))
 
669
        self.assertEqual([parent_id_utf8], vf.get_parents_with_ghosts('notbxbfse'))
 
670
        self.assertEqual({'notbxbfse':[parent_id_utf8]}, vf.get_graph_with_ghosts())
 
671
        self.assertTrue(self.callDeprecated([osutils._revision_id_warning],
 
672
                        vf.has_ghost, parent_id_unicode))
 
673
        self.assertTrue(vf.has_ghost(parent_id_utf8))
 
674
        # if we add something that is a ghost of another, it should correct the
 
675
        # results of the prior apis
 
676
        self.callDeprecated([osutils._revision_id_warning],
 
677
                            vf.add_lines, parent_id_unicode, [], [])
 
678
        self.assertEqual([parent_id_utf8, 'notbxbfse'], vf.get_ancestry(['notbxbfse']))
 
679
        self.assertEqual([parent_id_utf8], vf.get_parents('notbxbfse'))
 
680
        self.assertEqual({parent_id_utf8:(),
 
681
                          'notbxbfse':(parent_id_utf8, ),
 
682
                          },
 
683
                         vf.get_graph())
 
684
        self.assertTrue(self.callDeprecated([osutils._revision_id_warning],
 
685
                        vf.has_version, parent_id_unicode))
 
686
        self.assertTrue(vf.has_version(parent_id_utf8))
 
687
        # we have _with_ghost apis to give us ghost information.
 
688
        self.assertEqual([parent_id_utf8, 'notbxbfse'], vf.get_ancestry_with_ghosts(['notbxbfse']))
 
689
        self.assertEqual([parent_id_utf8], vf.get_parents_with_ghosts('notbxbfse'))
 
690
        self.assertEqual({parent_id_utf8:[],
 
691
                          'notbxbfse':[parent_id_utf8],
 
692
                          },
 
693
                         vf.get_graph_with_ghosts())
 
694
        self.assertFalse(self.callDeprecated([osutils._revision_id_warning],
 
695
                         vf.has_ghost, parent_id_unicode))
 
696
        self.assertFalse(vf.has_ghost(parent_id_utf8))
 
697
 
 
698
    def test_add_lines_with_ghosts_after_normal_revs(self):
 
699
        # some versioned file formats allow lines to be added with parent
 
700
        # information that is > than that in the format. Formats that do
 
701
        # not support this need to raise NotImplementedError on the
 
702
        # add_lines_with_ghosts api.
 
703
        vf = self.get_file()
 
704
        # probe for ghost support
 
705
        try:
 
706
            vf.has_ghost('hoo')
 
707
        except NotImplementedError:
 
708
            return
 
709
        vf.add_lines_with_ghosts('base', [], ['line\n', 'line_b\n'])
 
710
        vf.add_lines_with_ghosts('references_ghost',
 
711
                                 ['base', 'a_ghost'],
 
712
                                 ['line\n', 'line_b\n', 'line_c\n'])
 
713
        origins = vf.annotate('references_ghost')
 
714
        self.assertEquals(('base', 'line\n'), origins[0])
 
715
        self.assertEquals(('base', 'line_b\n'), origins[1])
 
716
        self.assertEquals(('references_ghost', 'line_c\n'), origins[2])
 
717
 
 
718
    def test_readonly_mode(self):
 
719
        transport = get_transport(self.get_url('.'))
 
720
        factory = self.get_factory()
 
721
        vf = factory('id', transport, 0777, create=True, access_mode='w')
 
722
        vf = factory('id', transport, access_mode='r')
 
723
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'base', [], [])
 
724
        self.assertRaises(errors.ReadOnlyError,
 
725
                          vf.add_lines_with_ghosts,
 
726
                          'base',
 
727
                          [],
 
728
                          [])
 
729
        self.assertRaises(errors.ReadOnlyError, vf.fix_parents, 'base', [])
 
730
        self.assertRaises(errors.ReadOnlyError, vf.join, 'base')
 
731
        self.assertRaises(errors.ReadOnlyError, vf.clone_text, 'base', 'bar', ['foo'])
 
732
    
 
733
    def test_get_sha1(self):
 
734
        # check the sha1 data is available
 
735
        vf = self.get_file()
 
736
        # a simple file
 
737
        vf.add_lines('a', [], ['a\n'])
 
738
        # the same file, different metadata
 
739
        vf.add_lines('b', ['a'], ['a\n'])
 
740
        # a file differing only in last newline.
 
741
        vf.add_lines('c', [], ['a'])
 
742
        self.assertEqual(
 
743
            '3f786850e387550fdab836ed7e6dc881de23001b', vf.get_sha1('a'))
 
744
        self.assertEqual(
 
745
            '3f786850e387550fdab836ed7e6dc881de23001b', vf.get_sha1('b'))
 
746
        self.assertEqual(
 
747
            '86f7e437faa5a7fce15d1ddcb9eaeaea377667b8', vf.get_sha1('c'))
 
748
 
 
749
        self.assertEqual(['3f786850e387550fdab836ed7e6dc881de23001b',
 
750
                          '86f7e437faa5a7fce15d1ddcb9eaeaea377667b8',
 
751
                          '3f786850e387550fdab836ed7e6dc881de23001b'],
 
752
                          vf.get_sha1s(['a', 'c', 'b']))
 
753
        
 
754
 
 
755
class TestWeave(TestCaseWithMemoryTransport, VersionedFileTestMixIn):
 
756
 
 
757
    def get_file(self, name='foo'):
 
758
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
759
 
 
760
    def get_file_corrupted_text(self):
 
761
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
762
        w.add_lines('v1', [], ['hello\n'])
 
763
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
764
        
 
765
        # We are going to invasively corrupt the text
 
766
        # Make sure the internals of weave are the same
 
767
        self.assertEqual([('{', 0)
 
768
                        , 'hello\n'
 
769
                        , ('}', None)
 
770
                        , ('{', 1)
 
771
                        , 'there\n'
 
772
                        , ('}', None)
 
773
                        ], w._weave)
 
774
        
 
775
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
776
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
777
                        ], w._sha1s)
 
778
        w.check()
 
779
        
 
780
        # Corrupted
 
781
        w._weave[4] = 'There\n'
 
782
        return w
 
783
 
 
784
    def get_file_corrupted_checksum(self):
 
785
        w = self.get_file_corrupted_text()
 
786
        # Corrected
 
787
        w._weave[4] = 'there\n'
 
788
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
789
        
 
790
        #Invalid checksum, first digit changed
 
791
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
792
        return w
 
793
 
 
794
    def reopen_file(self, name='foo', create=False):
 
795
        return WeaveFile(name, get_transport(self.get_url('.')), create=create)
 
796
 
 
797
    def test_no_implicit_create(self):
 
798
        self.assertRaises(errors.NoSuchFile,
 
799
                          WeaveFile,
 
800
                          'foo',
 
801
                          get_transport(self.get_url('.')))
 
802
 
 
803
    def get_factory(self):
 
804
        return WeaveFile
 
805
 
 
806
 
 
807
class TestKnit(TestCaseWithMemoryTransport, VersionedFileTestMixIn):
 
808
 
 
809
    def get_file(self, name='foo'):
 
810
        return self.get_factory()(name, get_transport(self.get_url('.')),
 
811
                                  delta=True, create=True)
 
812
 
 
813
    def get_factory(self):
 
814
        return KnitVersionedFile
 
815
 
 
816
    def get_file_corrupted_text(self):
 
817
        knit = self.get_file()
 
818
        knit.add_lines('v1', [], ['hello\n'])
 
819
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
820
        return knit
 
821
 
 
822
    def reopen_file(self, name='foo', create=False):
 
823
        return self.get_factory()(name, get_transport(self.get_url('.')),
 
824
            delta=True,
 
825
            create=create)
 
826
 
 
827
    def test_detection(self):
 
828
        knit = self.get_file()
 
829
        knit.check()
 
830
 
 
831
    def test_no_implicit_create(self):
 
832
        self.assertRaises(errors.NoSuchFile,
 
833
                          KnitVersionedFile,
 
834
                          'foo',
 
835
                          get_transport(self.get_url('.')))
 
836
 
 
837
 
 
838
class TestPlaintextKnit(TestKnit):
 
839
    """Test a knit with no cached annotations"""
 
840
 
 
841
    def _factory(self, name, transport, file_mode=None, access_mode=None,
 
842
                 delta=True, create=False):
 
843
        return KnitVersionedFile(name, transport, file_mode, access_mode,
 
844
                                 KnitPlainFactory(), delta=delta,
 
845
                                 create=create)
 
846
 
 
847
    def get_factory(self):
 
848
        return self._factory
 
849
 
 
850
 
 
851
class InterString(versionedfile.InterVersionedFile):
 
852
    """An inter-versionedfile optimised code path for strings.
 
853
 
 
854
    This is for use during testing where we use strings as versionedfiles
 
855
    so that none of the default regsitered interversionedfile classes will
 
856
    match - which lets us test the match logic.
 
857
    """
 
858
 
 
859
    @staticmethod
 
860
    def is_compatible(source, target):
 
861
        """InterString is compatible with strings-as-versionedfiles."""
 
862
        return isinstance(source, str) and isinstance(target, str)
 
863
 
 
864
 
 
865
# TODO this and the InterRepository core logic should be consolidatable
 
866
# if we make the registry a separate class though we still need to 
 
867
# test the behaviour in the active registry to catch failure-to-handle-
 
868
# stange-objects
 
869
class TestInterVersionedFile(TestCaseWithMemoryTransport):
 
870
 
 
871
    def test_get_default_inter_versionedfile(self):
 
872
        # test that the InterVersionedFile.get(a, b) probes
 
873
        # for a class where is_compatible(a, b) returns
 
874
        # true and returns a default interversionedfile otherwise.
 
875
        # This also tests that the default registered optimised interversionedfile
 
876
        # classes do not barf inappropriately when a surprising versionedfile type
 
877
        # is handed to them.
 
878
        dummy_a = "VersionedFile 1."
 
879
        dummy_b = "VersionedFile 2."
 
880
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
881
 
 
882
    def assertGetsDefaultInterVersionedFile(self, a, b):
 
883
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
 
884
        inter = versionedfile.InterVersionedFile.get(a, b)
 
885
        self.assertEqual(versionedfile.InterVersionedFile,
 
886
                         inter.__class__)
 
887
        self.assertEqual(a, inter.source)
 
888
        self.assertEqual(b, inter.target)
 
889
 
 
890
    def test_register_inter_versionedfile_class(self):
 
891
        # test that a optimised code path provider - a
 
892
        # InterVersionedFile subclass can be registered and unregistered
 
893
        # and that it is correctly selected when given a versionedfile
 
894
        # pair that it returns true on for the is_compatible static method
 
895
        # check
 
896
        dummy_a = "VersionedFile 1."
 
897
        dummy_b = "VersionedFile 2."
 
898
        versionedfile.InterVersionedFile.register_optimiser(InterString)
 
899
        try:
 
900
            # we should get the default for something InterString returns False
 
901
            # to
 
902
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
903
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
 
904
            # and we should get an InterString for a pair it 'likes'
 
905
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
906
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
 
907
            self.assertEqual(InterString, inter.__class__)
 
908
            self.assertEqual(dummy_a, inter.source)
 
909
            self.assertEqual(dummy_b, inter.target)
 
910
        finally:
 
911
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
 
912
        # now we should get the default InterVersionedFile object again.
 
913
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
914
 
 
915
 
 
916
class TestReadonlyHttpMixin(object):
 
917
 
 
918
    def test_readonly_http_works(self):
 
919
        # we should be able to read from http with a versioned file.
 
920
        vf = self.get_file()
 
921
        # try an empty file access
 
922
        readonly_vf = self.get_factory()('foo', get_transport(self.get_readonly_url('.')))
 
923
        self.assertEqual([], readonly_vf.versions())
 
924
        # now with feeling.
 
925
        vf.add_lines('1', [], ['a\n'])
 
926
        vf.add_lines('2', ['1'], ['b\n', 'a\n'])
 
927
        readonly_vf = self.get_factory()('foo', get_transport(self.get_readonly_url('.')))
 
928
        self.assertEqual(['1', '2'], vf.versions())
 
929
        for version in readonly_vf.versions():
 
930
            readonly_vf.get_lines(version)
 
931
 
 
932
 
 
933
class TestWeaveHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
 
934
 
 
935
    def get_file(self):
 
936
        return WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
937
 
 
938
    def get_factory(self):
 
939
        return WeaveFile
 
940
 
 
941
 
 
942
class TestKnitHTTP(TestCaseWithWebserver, TestReadonlyHttpMixin):
 
943
 
 
944
    def get_file(self):
 
945
        return KnitVersionedFile('foo', get_transport(self.get_url('.')),
 
946
                                 delta=True, create=True)
 
947
 
 
948
    def get_factory(self):
 
949
        return KnitVersionedFile
 
950
 
 
951
 
 
952
class MergeCasesMixin(object):
 
953
 
 
954
    def doMerge(self, base, a, b, mp):
 
955
        from cStringIO import StringIO
 
956
        from textwrap import dedent
 
957
 
 
958
        def addcrlf(x):
 
959
            return x + '\n'
 
960
        
 
961
        w = self.get_file()
 
962
        w.add_lines('text0', [], map(addcrlf, base))
 
963
        w.add_lines('text1', ['text0'], map(addcrlf, a))
 
964
        w.add_lines('text2', ['text0'], map(addcrlf, b))
 
965
 
 
966
        self.log_contents(w)
 
967
 
 
968
        self.log('merge plan:')
 
969
        p = list(w.plan_merge('text1', 'text2'))
 
970
        for state, line in p:
 
971
            if line:
 
972
                self.log('%12s | %s' % (state, line[:-1]))
 
973
 
 
974
        self.log('merge:')
 
975
        mt = StringIO()
 
976
        mt.writelines(w.weave_merge(p))
 
977
        mt.seek(0)
 
978
        self.log(mt.getvalue())
 
979
 
 
980
        mp = map(addcrlf, mp)
 
981
        self.assertEqual(mt.readlines(), mp)
 
982
        
 
983
        
 
984
    def testOneInsert(self):
 
985
        self.doMerge([],
 
986
                     ['aa'],
 
987
                     [],
 
988
                     ['aa'])
 
989
 
 
990
    def testSeparateInserts(self):
 
991
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
992
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
993
                     ['aaa', 'bbb', 'yyy', 'ccc'],
 
994
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
995
 
 
996
    def testSameInsert(self):
 
997
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
998
                     ['aaa', 'xxx', 'bbb', 'ccc'],
 
999
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'],
 
1000
                     ['aaa', 'xxx', 'bbb', 'yyy', 'ccc'])
 
1001
    overlappedInsertExpected = ['aaa', 'xxx', 'yyy', 'bbb']
 
1002
    def testOverlappedInsert(self):
 
1003
        self.doMerge(['aaa', 'bbb'],
 
1004
                     ['aaa', 'xxx', 'yyy', 'bbb'],
 
1005
                     ['aaa', 'xxx', 'bbb'], self.overlappedInsertExpected)
 
1006
 
 
1007
        # really it ought to reduce this to 
 
1008
        # ['aaa', 'xxx', 'yyy', 'bbb']
 
1009
 
 
1010
 
 
1011
    def testClashReplace(self):
 
1012
        self.doMerge(['aaa'],
 
1013
                     ['xxx'],
 
1014
                     ['yyy', 'zzz'],
 
1015
                     ['<<<<<<< ', 'xxx', '=======', 'yyy', 'zzz', 
 
1016
                      '>>>>>>> '])
 
1017
 
 
1018
    def testNonClashInsert1(self):
 
1019
        self.doMerge(['aaa'],
 
1020
                     ['xxx', 'aaa'],
 
1021
                     ['yyy', 'zzz'],
 
1022
                     ['<<<<<<< ', 'xxx', 'aaa', '=======', 'yyy', 'zzz', 
 
1023
                      '>>>>>>> '])
 
1024
 
 
1025
    def testNonClashInsert2(self):
 
1026
        self.doMerge(['aaa'],
 
1027
                     ['aaa'],
 
1028
                     ['yyy', 'zzz'],
 
1029
                     ['yyy', 'zzz'])
 
1030
 
 
1031
 
 
1032
    def testDeleteAndModify(self):
 
1033
        """Clashing delete and modification.
 
1034
 
 
1035
        If one side modifies a region and the other deletes it then
 
1036
        there should be a conflict with one side blank.
 
1037
        """
 
1038
 
 
1039
        #######################################
 
1040
        # skippd, not working yet
 
1041
        return
 
1042
        
 
1043
        self.doMerge(['aaa', 'bbb', 'ccc'],
 
1044
                     ['aaa', 'ddd', 'ccc'],
 
1045
                     ['aaa', 'ccc'],
 
1046
                     ['<<<<<<<< ', 'aaa', '=======', '>>>>>>> ', 'ccc'])
 
1047
 
 
1048
    def _test_merge_from_strings(self, base, a, b, expected):
 
1049
        w = self.get_file()
 
1050
        w.add_lines('text0', [], base.splitlines(True))
 
1051
        w.add_lines('text1', ['text0'], a.splitlines(True))
 
1052
        w.add_lines('text2', ['text0'], b.splitlines(True))
 
1053
        self.log('merge plan:')
 
1054
        p = list(w.plan_merge('text1', 'text2'))
 
1055
        for state, line in p:
 
1056
            if line:
 
1057
                self.log('%12s | %s' % (state, line[:-1]))
 
1058
        self.log('merge result:')
 
1059
        result_text = ''.join(w.weave_merge(p))
 
1060
        self.log(result_text)
 
1061
        self.assertEqualDiff(result_text, expected)
 
1062
 
 
1063
    def test_weave_merge_conflicts(self):
 
1064
        # does weave merge properly handle plans that end with unchanged?
 
1065
        result = ''.join(self.get_file().weave_merge([('new-a', 'hello\n')]))
 
1066
        self.assertEqual(result, 'hello\n')
 
1067
 
 
1068
    def test_deletion_extended(self):
 
1069
        """One side deletes, the other deletes more.
 
1070
        """
 
1071
        base = """\
 
1072
            line 1
 
1073
            line 2
 
1074
            line 3
 
1075
            """
 
1076
        a = """\
 
1077
            line 1
 
1078
            line 2
 
1079
            """
 
1080
        b = """\
 
1081
            line 1
 
1082
            """
 
1083
        result = """\
 
1084
            line 1
 
1085
            """
 
1086
        self._test_merge_from_strings(base, a, b, result)
 
1087
 
 
1088
    def test_deletion_overlap(self):
 
1089
        """Delete overlapping regions with no other conflict.
 
1090
 
 
1091
        Arguably it'd be better to treat these as agreement, rather than 
 
1092
        conflict, but for now conflict is safer.
 
1093
        """
 
1094
        base = """\
 
1095
            start context
 
1096
            int a() {}
 
1097
            int b() {}
 
1098
            int c() {}
 
1099
            end context
 
1100
            """
 
1101
        a = """\
 
1102
            start context
 
1103
            int a() {}
 
1104
            end context
 
1105
            """
 
1106
        b = """\
 
1107
            start context
 
1108
            int c() {}
 
1109
            end context
 
1110
            """
 
1111
        result = """\
 
1112
            start context
 
1113
<<<<<<< 
 
1114
            int a() {}
 
1115
=======
 
1116
            int c() {}
 
1117
>>>>>>> 
 
1118
            end context
 
1119
            """
 
1120
        self._test_merge_from_strings(base, a, b, result)
 
1121
 
 
1122
    def test_agreement_deletion(self):
 
1123
        """Agree to delete some lines, without conflicts."""
 
1124
        base = """\
 
1125
            start context
 
1126
            base line 1
 
1127
            base line 2
 
1128
            end context
 
1129
            """
 
1130
        a = """\
 
1131
            start context
 
1132
            base line 1
 
1133
            end context
 
1134
            """
 
1135
        b = """\
 
1136
            start context
 
1137
            base line 1
 
1138
            end context
 
1139
            """
 
1140
        result = """\
 
1141
            start context
 
1142
            base line 1
 
1143
            end context
 
1144
            """
 
1145
        self._test_merge_from_strings(base, a, b, result)
 
1146
 
 
1147
    def test_sync_on_deletion(self):
 
1148
        """Specific case of merge where we can synchronize incorrectly.
 
1149
        
 
1150
        A previous version of the weave merge concluded that the two versions
 
1151
        agreed on deleting line 2, and this could be a synchronization point.
 
1152
        Line 1 was then considered in isolation, and thought to be deleted on 
 
1153
        both sides.
 
1154
 
 
1155
        It's better to consider the whole thing as a disagreement region.
 
1156
        """
 
1157
        base = """\
 
1158
            start context
 
1159
            base line 1
 
1160
            base line 2
 
1161
            end context
 
1162
            """
 
1163
        a = """\
 
1164
            start context
 
1165
            base line 1
 
1166
            a's replacement line 2
 
1167
            end context
 
1168
            """
 
1169
        b = """\
 
1170
            start context
 
1171
            b replaces
 
1172
            both lines
 
1173
            end context
 
1174
            """
 
1175
        result = """\
 
1176
            start context
 
1177
<<<<<<< 
 
1178
            base line 1
 
1179
            a's replacement line 2
 
1180
=======
 
1181
            b replaces
 
1182
            both lines
 
1183
>>>>>>> 
 
1184
            end context
 
1185
            """
 
1186
        self._test_merge_from_strings(base, a, b, result)
 
1187
 
 
1188
 
 
1189
class TestKnitMerge(TestCaseWithMemoryTransport, MergeCasesMixin):
 
1190
 
 
1191
    def get_file(self, name='foo'):
 
1192
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
 
1193
                                 delta=True, create=True)
 
1194
 
 
1195
    def log_contents(self, w):
 
1196
        pass
 
1197
 
 
1198
 
 
1199
class TestWeaveMerge(TestCaseWithMemoryTransport, MergeCasesMixin):
 
1200
 
 
1201
    def get_file(self, name='foo'):
 
1202
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
1203
 
 
1204
    def log_contents(self, w):
 
1205
        self.log('weave is:')
 
1206
        tmpf = StringIO()
 
1207
        write_weave(w, tmpf)
 
1208
        self.log(tmpf.getvalue())
 
1209
 
 
1210
    overlappedInsertExpected = ['aaa', '<<<<<<< ', 'xxx', 'yyy', '=======', 
 
1211
                                'xxx', '>>>>>>> ', 'bbb']
 
1212
 
 
1213
 
 
1214
class TestFormatSignatures(TestCaseWithMemoryTransport):
 
1215
 
 
1216
    def get_knit_file(self, name, annotated):
 
1217
        if annotated:
 
1218
            factory = KnitAnnotateFactory()
 
1219
        else:
 
1220
            factory = KnitPlainFactory()
 
1221
        return KnitVersionedFile(
 
1222
            name, get_transport(self.get_url('.')), create=True,
 
1223
            factory=factory)
 
1224
 
 
1225
    def test_knit_format_signatures(self):
 
1226
        """Different formats of knit have different signature strings."""
 
1227
        knit = self.get_knit_file('a', True)
 
1228
        self.assertEqual('knit-annotated', knit.get_format_signature())
 
1229
        knit = self.get_knit_file('p', False)
 
1230
        self.assertEqual('knit-plain', knit.get_format_signature())
 
1231