~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

Got merge and revert using nested pbs

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
 
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
 
 
21
import bzrlib
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import (
 
24
                           RevisionNotPresent, 
 
25
                           RevisionAlreadyPresent,
 
26
                           WeaveParentMismatch
 
27
                           )
 
28
from bzrlib.knit import KnitVersionedFile, \
 
29
     KnitAnnotateFactory
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.trace import mutter
 
32
from bzrlib.transport import get_transport
 
33
from bzrlib.transport.memory import MemoryTransport
 
34
import bzrlib.versionedfile as versionedfile
 
35
from bzrlib.weave import WeaveFile
 
36
from bzrlib.weavefile import read_weave
 
37
 
 
38
 
 
39
class VersionedFileTestMixIn(object):
 
40
    """A mixin test class for testing VersionedFiles.
 
41
 
 
42
    This is not an adaptor-style test at this point because
 
43
    theres no dynamic substitution of versioned file implementations,
 
44
    they are strictly controlled by their owning repositories.
 
45
    """
 
46
 
 
47
    def test_add(self):
 
48
        f = self.get_file()
 
49
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
50
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
51
        def verify_file(f):
 
52
            versions = f.versions()
 
53
            self.assertTrue('r0' in versions)
 
54
            self.assertTrue('r1' in versions)
 
55
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
56
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
57
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
58
            self.assertEqual(2, len(f))
 
59
            self.assertEqual(2, f.num_versions())
 
60
    
 
61
            self.assertRaises(RevisionNotPresent,
 
62
                f.add_lines, 'r2', ['foo'], [])
 
63
            self.assertRaises(RevisionAlreadyPresent,
 
64
                f.add_lines, 'r1', [], [])
 
65
        verify_file(f)
 
66
        f = self.reopen_file()
 
67
        verify_file(f)
 
68
 
 
69
    def test_ancestry(self):
 
70
        f = self.get_file()
 
71
        self.assertEqual([], f.get_ancestry([]))
 
72
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
73
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
74
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
75
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
76
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
77
        self.assertEqual([], f.get_ancestry([]))
 
78
        versions = f.get_ancestry(['rM'])
 
79
        # there are some possibilities:
 
80
        # r0 r1 r2 rM r3
 
81
        # r0 r1 r2 r3 rM
 
82
        # etc
 
83
        # so we check indexes
 
84
        r0 = versions.index('r0')
 
85
        r1 = versions.index('r1')
 
86
        r2 = versions.index('r2')
 
87
        self.assertFalse('r3' in versions)
 
88
        rM = versions.index('rM')
 
89
        self.assertTrue(r0 < r1)
 
90
        self.assertTrue(r0 < r2)
 
91
        self.assertTrue(r1 < rM)
 
92
        self.assertTrue(r2 < rM)
 
93
 
 
94
        self.assertRaises(RevisionNotPresent,
 
95
            f.get_ancestry, ['rM', 'rX'])
 
96
        
 
97
    def test_clear_cache(self):
 
98
        f = self.get_file()
 
99
        # on a new file it should not error
 
100
        f.clear_cache()
 
101
        # and after adding content, doing a clear_cache and a get should work.
 
102
        f.add_lines('0', [], ['a'])
 
103
        f.clear_cache()
 
104
        self.assertEqual(['a'], f.get_lines('0'))
 
105
 
 
106
    def test_clone_text(self):
 
107
        f = self.get_file()
 
108
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
109
        f.clone_text('r1', 'r0', ['r0'])
 
110
        def verify_file(f):
 
111
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
112
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
113
            self.assertEquals(f.get_parents('r1'), ['r0'])
 
114
    
 
115
            self.assertRaises(RevisionNotPresent,
 
116
                f.clone_text, 'r2', 'rX', [])
 
117
            self.assertRaises(RevisionAlreadyPresent,
 
118
                f.clone_text, 'r1', 'r0', [])
 
119
        verify_file(f)
 
120
        verify_file(self.reopen_file())
 
121
 
 
122
    def test_create_empty(self):
 
123
        f = self.get_file()
 
124
        f.add_lines('0', [], ['a\n'])
 
125
        new_f = f.create_empty('t', MemoryTransport())
 
126
        # smoke test, specific types should check it is honoured correctly for
 
127
        # non type attributes
 
128
        self.assertEqual([], new_f.versions())
 
129
        self.assertTrue(isinstance(new_f, f.__class__))
 
130
 
 
131
    def test_copy_to(self):
 
132
        f = self.get_file()
 
133
        f.add_lines('0', [], ['a\n'])
 
134
        t = MemoryTransport()
 
135
        f.copy_to('foo', t)
 
136
        for suffix in f.__class__.get_suffixes():
 
137
            self.assertTrue(t.has('foo' + suffix))
 
138
 
 
139
    def test_get_suffixes(self):
 
140
        f = self.get_file()
 
141
        # should be the same
 
142
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
 
143
        # and should be a list
 
144
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
 
145
 
 
146
    def test_get_graph(self):
 
147
        f = self.get_file()
 
148
        f.add_lines('v1', [], ['hello\n'])
 
149
        f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
150
        f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
 
151
        self.assertEqual({'v1': [],
 
152
                          'v2': ['v1'],
 
153
                          'v3': ['v2']},
 
154
                         f.get_graph())
 
155
 
 
156
    def test_get_parents(self):
 
157
        f = self.get_file()
 
158
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
159
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
160
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
161
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
162
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
163
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
164
 
 
165
        self.assertRaises(RevisionNotPresent,
 
166
            f.get_parents, 'y')
 
167
 
 
168
    def test_annotate(self):
 
169
        f = self.get_file()
 
170
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
171
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
172
        origins = f.annotate('r1')
 
173
        self.assertEquals(origins[0][0], 'r1')
 
174
        self.assertEquals(origins[1][0], 'r0')
 
175
 
 
176
        self.assertRaises(RevisionNotPresent,
 
177
            f.annotate, 'foo')
 
178
 
 
179
    def test_walk(self):
 
180
        # tests that walk returns all the inclusions for the requested
 
181
        # revisions as well as the revisions changes themselves.
 
182
        f = self.get_file('1')
 
183
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
184
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
185
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
 
186
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
 
187
 
 
188
        lines = {}
 
189
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
 
190
            lines[text] = (insert, dset)
 
191
 
 
192
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
 
193
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
 
194
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
 
195
        self.assertTrue(lines['d\n'], ('rX', set([])))
 
196
        self.assertTrue(lines['e\n'], ('rY', set([])))
 
197
 
 
198
    def test_detection(self):
 
199
        # Test weaves detect corruption.
 
200
        #
 
201
        # Weaves contain a checksum of their texts.
 
202
        # When a text is extracted, this checksum should be
 
203
        # verified.
 
204
 
 
205
        w = self.get_file_corrupted_text()
 
206
 
 
207
        self.assertEqual('hello\n', w.get_text('v1'))
 
208
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
209
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
210
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
211
 
 
212
        w = self.get_file_corrupted_checksum()
 
213
 
 
214
        self.assertEqual('hello\n', w.get_text('v1'))
 
215
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
216
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
217
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
218
 
 
219
    def get_file_corrupted_text(self):
 
220
        """Return a versioned file with corrupt text but valid metadata."""
 
221
        raise NotImplementedError(self.get_file_corrupted_text)
 
222
 
 
223
    def reopen_file(self, name='foo'):
 
224
        """Open the versioned file from disk again."""
 
225
        raise NotImplementedError(self.reopen_file)
 
226
 
 
227
 
 
228
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
 
229
 
 
230
    def get_file(self, name='foo'):
 
231
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
232
 
 
233
    def get_file_corrupted_text(self):
 
234
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
235
        w.add_lines('v1', [], ['hello\n'])
 
236
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
237
        
 
238
        # We are going to invasively corrupt the text
 
239
        # Make sure the internals of weave are the same
 
240
        self.assertEqual([('{', 0)
 
241
                        , 'hello\n'
 
242
                        , ('}', None)
 
243
                        , ('{', 1)
 
244
                        , 'there\n'
 
245
                        , ('}', None)
 
246
                        ], w._weave)
 
247
        
 
248
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
249
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
250
                        ], w._sha1s)
 
251
        w.check()
 
252
        
 
253
        # Corrupted
 
254
        w._weave[4] = 'There\n'
 
255
        return w
 
256
 
 
257
    def get_file_corrupted_checksum(self):
 
258
        w = self.get_file_corrupted_text()
 
259
        # Corrected
 
260
        w._weave[4] = 'there\n'
 
261
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
262
        
 
263
        #Invalid checksum, first digit changed
 
264
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
265
        return w
 
266
 
 
267
    def reopen_file(self, name='foo'):
 
268
        return WeaveFile(name, get_transport(self.get_url('.')))
 
269
 
 
270
    def test_no_implicit_create(self):
 
271
        self.assertRaises(errors.NoSuchFile,
 
272
                          WeaveFile,
 
273
                          'foo',
 
274
                          get_transport(self.get_url('.')))
 
275
 
 
276
 
 
277
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
 
278
 
 
279
    def get_file(self, name='foo'):
 
280
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
 
281
                                 delta=True, create=True)
 
282
 
 
283
    def get_file_corrupted_text(self):
 
284
        knit = self.get_file()
 
285
        knit.add_lines('v1', [], ['hello\n'])
 
286
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
287
        return knit
 
288
 
 
289
    def reopen_file(self, name='foo'):
 
290
        return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
 
291
 
 
292
    def test_detection(self):
 
293
        print "TODO for merging: create a corrupted knit."
 
294
        knit = self.get_file()
 
295
        knit.check()
 
296
 
 
297
    def test_no_implicit_create(self):
 
298
        self.assertRaises(errors.NoSuchFile,
 
299
                          KnitVersionedFile,
 
300
                          'foo',
 
301
                          get_transport(self.get_url('.')))
 
302
 
 
303
 
 
304
class InterString(versionedfile.InterVersionedFile):
 
305
    """An inter-versionedfile optimised code path for strings.
 
306
 
 
307
    This is for use during testing where we use strings as versionedfiles
 
308
    so that none of the default regsitered interversionedfile classes will
 
309
    match - which lets us test the match logic.
 
310
    """
 
311
 
 
312
    @staticmethod
 
313
    def is_compatible(source, target):
 
314
        """InterString is compatible with strings-as-versionedfiles."""
 
315
        return isinstance(source, str) and isinstance(target, str)
 
316
 
 
317
 
 
318
# TODO this and the InterRepository core logic should be consolidatable
 
319
# if we make the registry a separate class though we still need to 
 
320
# test the behaviour in the active registry to catch failure-to-handle-
 
321
# stange-objects
 
322
class TestInterVersionedFile(TestCaseWithTransport):
 
323
 
 
324
    def test_get_default_inter_versionedfile(self):
 
325
        # test that the InterVersionedFile.get(a, b) probes
 
326
        # for a class where is_compatible(a, b) returns
 
327
        # true and returns a default interversionedfile otherwise.
 
328
        # This also tests that the default registered optimised interversionedfile
 
329
        # classes do not barf inappropriately when a surprising versionedfile type
 
330
        # is handed to them.
 
331
        dummy_a = "VersionedFile 1."
 
332
        dummy_b = "VersionedFile 2."
 
333
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
334
 
 
335
    def assertGetsDefaultInterVersionedFile(self, a, b):
 
336
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
 
337
        inter = versionedfile.InterVersionedFile.get(a, b)
 
338
        self.assertEqual(versionedfile.InterVersionedFile,
 
339
                         inter.__class__)
 
340
        self.assertEqual(a, inter.source)
 
341
        self.assertEqual(b, inter.target)
 
342
 
 
343
    def test_register_inter_versionedfile_class(self):
 
344
        # test that a optimised code path provider - a
 
345
        # InterVersionedFile subclass can be registered and unregistered
 
346
        # and that it is correctly selected when given a versionedfile
 
347
        # pair that it returns true on for the is_compatible static method
 
348
        # check
 
349
        dummy_a = "VersionedFile 1."
 
350
        dummy_b = "VersionedFile 2."
 
351
        versionedfile.InterVersionedFile.register_optimiser(InterString)
 
352
        try:
 
353
            # we should get the default for something InterString returns False
 
354
            # to
 
355
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
356
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
 
357
            # and we should get an InterString for a pair it 'likes'
 
358
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
359
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
 
360
            self.assertEqual(InterString, inter.__class__)
 
361
            self.assertEqual(dummy_a, inter.source)
 
362
            self.assertEqual(dummy_b, inter.target)
 
363
        finally:
 
364
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
 
365
        # now we should get the default InterVersionedFile object again.
 
366
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
367
 
 
368