~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

  • Committer: Martin Pool
  • Date: 2006-03-10 06:29:53 UTC
  • mfrom: (1608 +trunk)
  • mto: This revision was merged to the branch mainline in revision 1611.
  • Revision ID: mbp@sourcefrog.net-20060310062953-bc1c7ade75c89a7a
[merge] bzr.dev; pycurl not updated for readv yet

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (C) 2005 by Canonical Ltd
 
2
#
 
3
# Authors:
 
4
#   Johan Rydberg <jrydberg@gnu.org>
 
5
#
 
6
# This program is free software; you can redistribute it and/or modify
 
7
# it under the terms of the GNU General Public License as published by
 
8
# the Free Software Foundation; either version 2 of the License, or
 
9
# (at your option) any later version.
 
10
 
 
11
# This program is distributed in the hope that it will be useful,
 
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
# GNU General Public License for more details.
 
15
 
 
16
# You should have received a copy of the GNU General Public License
 
17
# along with this program; if not, write to the Free Software
 
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
19
 
 
20
 
 
21
import bzrlib
 
22
import bzrlib.errors as errors
 
23
from bzrlib.errors import (
 
24
                           RevisionNotPresent, 
 
25
                           RevisionAlreadyPresent,
 
26
                           WeaveParentMismatch
 
27
                           )
 
28
from bzrlib.knit import KnitVersionedFile, \
 
29
     KnitAnnotateFactory
 
30
from bzrlib.tests import TestCaseWithTransport
 
31
from bzrlib.trace import mutter
 
32
from bzrlib.transport import get_transport
 
33
from bzrlib.transport.memory import MemoryTransport
 
34
import bzrlib.versionedfile as versionedfile
 
35
from bzrlib.weave import WeaveFile
 
36
from bzrlib.weavefile import read_weave
 
37
 
 
38
 
 
39
class VersionedFileTestMixIn(object):
 
40
    """A mixin test class for testing VersionedFiles.
 
41
 
 
42
    This is not an adaptor-style test at this point because
 
43
    theres no dynamic substitution of versioned file implementations,
 
44
    they are strictly controlled by their owning repositories.
 
45
    """
 
46
 
 
47
    def test_add(self):
 
48
        f = self.get_file()
 
49
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
50
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
51
        def verify_file(f):
 
52
            versions = f.versions()
 
53
            self.assertTrue('r0' in versions)
 
54
            self.assertTrue('r1' in versions)
 
55
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
 
56
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
 
57
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
 
58
            self.assertEqual(2, len(f))
 
59
            self.assertEqual(2, f.num_versions())
 
60
    
 
61
            self.assertRaises(RevisionNotPresent,
 
62
                f.add_lines, 'r2', ['foo'], [])
 
63
            self.assertRaises(RevisionAlreadyPresent,
 
64
                f.add_lines, 'r1', [], [])
 
65
        verify_file(f)
 
66
        f = self.reopen_file()
 
67
        verify_file(f)
 
68
 
 
69
    def test_ancestry(self):
 
70
        f = self.get_file()
 
71
        self.assertEqual([], f.get_ancestry([]))
 
72
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
73
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
 
74
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
 
75
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
 
76
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
 
77
        self.assertEqual([], f.get_ancestry([]))
 
78
        versions = f.get_ancestry(['rM'])
 
79
        # there are some possibilities:
 
80
        # r0 r1 r2 rM r3
 
81
        # r0 r1 r2 r3 rM
 
82
        # etc
 
83
        # so we check indexes
 
84
        r0 = versions.index('r0')
 
85
        r1 = versions.index('r1')
 
86
        r2 = versions.index('r2')
 
87
        self.assertFalse('r3' in versions)
 
88
        rM = versions.index('rM')
 
89
        self.assertTrue(r0 < r1)
 
90
        self.assertTrue(r0 < r2)
 
91
        self.assertTrue(r1 < rM)
 
92
        self.assertTrue(r2 < rM)
 
93
 
 
94
        self.assertRaises(RevisionNotPresent,
 
95
            f.get_ancestry, ['rM', 'rX'])
 
96
 
 
97
    def test_mutate_after_finish(self):
 
98
        f = self.get_file()
 
99
        f.transaction_finished()
 
100
        self.assertRaises(errors.OutSideTransaction, f.add_lines, '', [], [])
 
101
        self.assertRaises(errors.OutSideTransaction, f.add_lines_with_ghosts, '', [], [])
 
102
        self.assertRaises(errors.OutSideTransaction, f.fix_parents, '', [])
 
103
        self.assertRaises(errors.OutSideTransaction, f.join, '')
 
104
        self.assertRaises(errors.OutSideTransaction, f.clone_text, 'base', 'bar', ['foo'])
 
105
        
 
106
    def test_clear_cache(self):
 
107
        f = self.get_file()
 
108
        # on a new file it should not error
 
109
        f.clear_cache()
 
110
        # and after adding content, doing a clear_cache and a get should work.
 
111
        f.add_lines('0', [], ['a'])
 
112
        f.clear_cache()
 
113
        self.assertEqual(['a'], f.get_lines('0'))
 
114
 
 
115
    def test_clone_text(self):
 
116
        f = self.get_file()
 
117
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
118
        f.clone_text('r1', 'r0', ['r0'])
 
119
        def verify_file(f):
 
120
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
 
121
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
 
122
            self.assertEquals(f.get_parents('r1'), ['r0'])
 
123
    
 
124
            self.assertRaises(RevisionNotPresent,
 
125
                f.clone_text, 'r2', 'rX', [])
 
126
            self.assertRaises(RevisionAlreadyPresent,
 
127
                f.clone_text, 'r1', 'r0', [])
 
128
        verify_file(f)
 
129
        verify_file(self.reopen_file())
 
130
 
 
131
    def test_create_empty(self):
 
132
        f = self.get_file()
 
133
        f.add_lines('0', [], ['a\n'])
 
134
        new_f = f.create_empty('t', MemoryTransport())
 
135
        # smoke test, specific types should check it is honoured correctly for
 
136
        # non type attributes
 
137
        self.assertEqual([], new_f.versions())
 
138
        self.assertTrue(isinstance(new_f, f.__class__))
 
139
 
 
140
    def test_copy_to(self):
 
141
        f = self.get_file()
 
142
        f.add_lines('0', [], ['a\n'])
 
143
        t = MemoryTransport()
 
144
        f.copy_to('foo', t)
 
145
        for suffix in f.__class__.get_suffixes():
 
146
            self.assertTrue(t.has('foo' + suffix))
 
147
 
 
148
    def test_get_suffixes(self):
 
149
        f = self.get_file()
 
150
        # should be the same
 
151
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
 
152
        # and should be a list
 
153
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
 
154
 
 
155
    def test_get_graph(self):
 
156
        f = self.get_file()
 
157
        f.add_lines('v1', [], ['hello\n'])
 
158
        f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
 
159
        f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
 
160
        self.assertEqual({'v1': [],
 
161
                          'v2': ['v1'],
 
162
                          'v3': ['v2']},
 
163
                         f.get_graph())
 
164
 
 
165
    def test_get_parents(self):
 
166
        f = self.get_file()
 
167
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
168
        f.add_lines('r1', [], ['a\n', 'b\n'])
 
169
        f.add_lines('r2', [], ['a\n', 'b\n'])
 
170
        f.add_lines('r3', [], ['a\n', 'b\n'])
 
171
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
 
172
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
 
173
 
 
174
        self.assertRaises(RevisionNotPresent,
 
175
            f.get_parents, 'y')
 
176
 
 
177
    def test_annotate(self):
 
178
        f = self.get_file()
 
179
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
180
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
181
        origins = f.annotate('r1')
 
182
        self.assertEquals(origins[0][0], 'r1')
 
183
        self.assertEquals(origins[1][0], 'r0')
 
184
 
 
185
        self.assertRaises(RevisionNotPresent,
 
186
            f.annotate, 'foo')
 
187
 
 
188
    def test_walk(self):
 
189
        # tests that walk returns all the inclusions for the requested
 
190
        # revisions as well as the revisions changes themselves.
 
191
        f = self.get_file('1')
 
192
        f.add_lines('r0', [], ['a\n', 'b\n'])
 
193
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
 
194
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
 
195
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
 
196
 
 
197
        lines = {}
 
198
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
 
199
            lines[text] = (insert, dset)
 
200
 
 
201
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
 
202
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
 
203
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
 
204
        self.assertTrue(lines['d\n'], ('rX', set([])))
 
205
        self.assertTrue(lines['e\n'], ('rY', set([])))
 
206
 
 
207
    def test_detection(self):
 
208
        # Test weaves detect corruption.
 
209
        #
 
210
        # Weaves contain a checksum of their texts.
 
211
        # When a text is extracted, this checksum should be
 
212
        # verified.
 
213
 
 
214
        w = self.get_file_corrupted_text()
 
215
 
 
216
        self.assertEqual('hello\n', w.get_text('v1'))
 
217
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
218
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
219
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
220
 
 
221
        w = self.get_file_corrupted_checksum()
 
222
 
 
223
        self.assertEqual('hello\n', w.get_text('v1'))
 
224
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
 
225
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
 
226
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
 
227
 
 
228
    def get_file_corrupted_text(self):
 
229
        """Return a versioned file with corrupt text but valid metadata."""
 
230
        raise NotImplementedError(self.get_file_corrupted_text)
 
231
 
 
232
    def reopen_file(self, name='foo'):
 
233
        """Open the versioned file from disk again."""
 
234
        raise NotImplementedError(self.reopen_file)
 
235
 
 
236
    def test_iter_lines_added_or_present_in_versions(self):
 
237
        # test that we get at least an equalset of the lines added by
 
238
        # versions in the weave 
 
239
        # the ordering here is to make a tree so that dumb searches have
 
240
        # more changes to muck up.
 
241
        vf = self.get_file()
 
242
        # add a base to get included
 
243
        vf.add_lines('base', [], ['base\n'])
 
244
        # add a ancestor to be included on one side
 
245
        vf.add_lines('lancestor', [], ['lancestor\n'])
 
246
        # add a ancestor to be included on the other side
 
247
        vf.add_lines('rancestor', ['base'], ['rancestor\n'])
 
248
        # add a child of rancestor with no eofile-nl
 
249
        vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
 
250
        # add a child of lancestor and base to join the two roots
 
251
        vf.add_lines('otherchild',
 
252
                     ['lancestor', 'base'],
 
253
                     ['base\n', 'lancestor\n', 'otherchild\n'])
 
254
        def iter_with_versions(versions):
 
255
            # now we need to see what lines are returned, and how often.
 
256
            lines = {'base\n':0,
 
257
                     'lancestor\n':0,
 
258
                     'rancestor\n':0,
 
259
                     'child\n':0,
 
260
                     'otherchild\n':0,
 
261
                     }
 
262
            # iterate over the lines
 
263
            for line in vf.iter_lines_added_or_present_in_versions(versions):
 
264
                lines[line] += 1
 
265
            return lines
 
266
        lines = iter_with_versions(['child', 'otherchild'])
 
267
        # we must see child and otherchild
 
268
        self.assertTrue(lines['child\n'] > 0)
 
269
        self.assertTrue(lines['otherchild\n'] > 0)
 
270
        # we dont care if we got more than that.
 
271
        
 
272
        # test all lines
 
273
        lines = iter_with_versions(None)
 
274
        # all lines must be seen at least once
 
275
        self.assertTrue(lines['base\n'] > 0)
 
276
        self.assertTrue(lines['lancestor\n'] > 0)
 
277
        self.assertTrue(lines['rancestor\n'] > 0)
 
278
        self.assertTrue(lines['child\n'] > 0)
 
279
        self.assertTrue(lines['otherchild\n'] > 0)
 
280
 
 
281
    def test_fix_parents(self):
 
282
        # some versioned files allow incorrect parents to be corrected after
 
283
        # insertion - this may not fix ancestry..
 
284
        # if they do not supported, they just do not implement it.
 
285
        # we test this as an interface test to ensure that those that *do*
 
286
        # implementent it get it right.
 
287
        vf = self.get_file()
 
288
        vf.add_lines('notbase', [], [])
 
289
        vf.add_lines('base', [], [])
 
290
        try:
 
291
            vf.fix_parents('notbase', ['base'])
 
292
        except NotImplementedError:
 
293
            return
 
294
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
295
        # open again, check it stuck.
 
296
        vf = self.get_file()
 
297
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
298
 
 
299
    def test_fix_parents_with_ghosts(self):
 
300
        # when fixing parents, ghosts that are listed should not be ghosts
 
301
        # anymore.
 
302
        vf = self.get_file()
 
303
 
 
304
        try:
 
305
            vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
 
306
        except NotImplementedError:
 
307
            return
 
308
        vf.add_lines('base', [], [])
 
309
        vf.fix_parents('notbase', ['base', 'stillghost'])
 
310
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
311
        # open again, check it stuck.
 
312
        vf = self.get_file()
 
313
        self.assertEqual(['base'], vf.get_parents('notbase'))
 
314
        # and check the ghosts
 
315
        self.assertEqual(['base', 'stillghost'],
 
316
                         vf.get_parents_with_ghosts('notbase'))
 
317
 
 
318
    def test_add_lines_with_ghosts(self):
 
319
        # some versioned file formats allow lines to be added with parent
 
320
        # information that is > than that in the format. Formats that do
 
321
        # not support this need to raise NotImplementedError on the
 
322
        # add_lines_with_ghosts api.
 
323
        vf = self.get_file()
 
324
        # add a revision with ghost parents
 
325
        try:
 
326
            vf.add_lines_with_ghosts(u'notbxbfse', [u'b\xbfse'], [])
 
327
        except NotImplementedError:
 
328
            # check the other ghost apis are also not implemented
 
329
            self.assertRaises(NotImplementedError, vf.has_ghost, 'foo')
 
330
            self.assertRaises(NotImplementedError, vf.get_ancestry_with_ghosts, ['foo'])
 
331
            self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
 
332
            self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
 
333
            return
 
334
        # test key graph related apis: getncestry, _graph, get_parents
 
335
        # has_version
 
336
        # - these are ghost unaware and must not be reflect ghosts
 
337
        self.assertEqual([u'notbxbfse'], vf.get_ancestry(u'notbxbfse'))
 
338
        self.assertEqual([], vf.get_parents(u'notbxbfse'))
 
339
        self.assertEqual({u'notbxbfse':[]}, vf.get_graph())
 
340
        self.assertFalse(vf.has_version(u'b\xbfse'))
 
341
        # we have _with_ghost apis to give us ghost information.
 
342
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
 
343
        self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
 
344
        self.assertEqual({u'notbxbfse':[u'b\xbfse']}, vf.get_graph_with_ghosts())
 
345
        self.assertTrue(vf.has_ghost(u'b\xbfse'))
 
346
        # if we add something that is a ghost of another, it should correct the
 
347
        # results of the prior apis
 
348
        vf.add_lines(u'b\xbfse', [], [])
 
349
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry([u'notbxbfse']))
 
350
        self.assertEqual([u'b\xbfse'], vf.get_parents(u'notbxbfse'))
 
351
        self.assertEqual({u'b\xbfse':[],
 
352
                          u'notbxbfse':[u'b\xbfse'],
 
353
                          },
 
354
                         vf.get_graph())
 
355
        self.assertTrue(vf.has_version(u'b\xbfse'))
 
356
        # we have _with_ghost apis to give us ghost information.
 
357
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
 
358
        self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
 
359
        self.assertEqual({u'b\xbfse':[],
 
360
                          u'notbxbfse':[u'b\xbfse'],
 
361
                          },
 
362
                         vf.get_graph_with_ghosts())
 
363
        self.assertFalse(vf.has_ghost(u'b\xbfse'))
 
364
 
 
365
    def test_add_lines_with_ghosts_after_normal_revs(self):
 
366
        # some versioned file formats allow lines to be added with parent
 
367
        # information that is > than that in the format. Formats that do
 
368
        # not support this need to raise NotImplementedError on the
 
369
        # add_lines_with_ghosts api.
 
370
        vf = self.get_file()
 
371
        # probe for ghost support
 
372
        try:
 
373
            vf.has_ghost('hoo')
 
374
        except NotImplementedError:
 
375
            return
 
376
        vf.add_lines_with_ghosts('base', [], ['line\n', 'line_b\n'])
 
377
        vf.add_lines_with_ghosts('references_ghost',
 
378
                                 ['base', 'a_ghost'],
 
379
                                 ['line\n', 'line_b\n', 'line_c\n'])
 
380
        origins = vf.annotate('references_ghost')
 
381
        self.assertEquals(('base', 'line\n'), origins[0])
 
382
        self.assertEquals(('base', 'line_b\n'), origins[1])
 
383
        self.assertEquals(('references_ghost', 'line_c\n'), origins[2])
 
384
 
 
385
    def test_readonly_mode(self):
 
386
        transport = get_transport(self.get_url('.'))
 
387
        factory = self.get_factory()
 
388
        vf = factory('id', transport, 0777, create=True, access_mode='w')
 
389
        vf = factory('id', transport, access_mode='r')
 
390
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'base', [], [])
 
391
        self.assertRaises(errors.ReadOnlyError,
 
392
                          vf.add_lines_with_ghosts,
 
393
                          'base',
 
394
                          [],
 
395
                          [])
 
396
        self.assertRaises(errors.ReadOnlyError, vf.fix_parents, 'base', [])
 
397
        self.assertRaises(errors.ReadOnlyError, vf.join, 'base')
 
398
        self.assertRaises(errors.ReadOnlyError, vf.clone_text, 'base', 'bar', ['foo'])
 
399
        
 
400
 
 
401
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
 
402
 
 
403
    def get_file(self, name='foo'):
 
404
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
 
405
 
 
406
    def get_file_corrupted_text(self):
 
407
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
 
408
        w.add_lines('v1', [], ['hello\n'])
 
409
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
410
        
 
411
        # We are going to invasively corrupt the text
 
412
        # Make sure the internals of weave are the same
 
413
        self.assertEqual([('{', 0)
 
414
                        , 'hello\n'
 
415
                        , ('}', None)
 
416
                        , ('{', 1)
 
417
                        , 'there\n'
 
418
                        , ('}', None)
 
419
                        ], w._weave)
 
420
        
 
421
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
 
422
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
423
                        ], w._sha1s)
 
424
        w.check()
 
425
        
 
426
        # Corrupted
 
427
        w._weave[4] = 'There\n'
 
428
        return w
 
429
 
 
430
    def get_file_corrupted_checksum(self):
 
431
        w = self.get_file_corrupted_text()
 
432
        # Corrected
 
433
        w._weave[4] = 'there\n'
 
434
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
 
435
        
 
436
        #Invalid checksum, first digit changed
 
437
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
 
438
        return w
 
439
 
 
440
    def reopen_file(self, name='foo'):
 
441
        return WeaveFile(name, get_transport(self.get_url('.')))
 
442
 
 
443
    def test_no_implicit_create(self):
 
444
        self.assertRaises(errors.NoSuchFile,
 
445
                          WeaveFile,
 
446
                          'foo',
 
447
                          get_transport(self.get_url('.')))
 
448
 
 
449
    def get_factory(self):
 
450
        return WeaveFile
 
451
 
 
452
 
 
453
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
 
454
 
 
455
    def get_file(self, name='foo'):
 
456
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
 
457
                                 delta=True, create=True)
 
458
 
 
459
    def get_factory(self):
 
460
        return KnitVersionedFile
 
461
 
 
462
    def get_file_corrupted_text(self):
 
463
        knit = self.get_file()
 
464
        knit.add_lines('v1', [], ['hello\n'])
 
465
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
 
466
        return knit
 
467
 
 
468
    def reopen_file(self, name='foo'):
 
469
        return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
 
470
 
 
471
    def test_detection(self):
 
472
        print "TODO for merging: create a corrupted knit."
 
473
        knit = self.get_file()
 
474
        knit.check()
 
475
 
 
476
    def test_no_implicit_create(self):
 
477
        self.assertRaises(errors.NoSuchFile,
 
478
                          KnitVersionedFile,
 
479
                          'foo',
 
480
                          get_transport(self.get_url('.')))
 
481
 
 
482
 
 
483
class InterString(versionedfile.InterVersionedFile):
 
484
    """An inter-versionedfile optimised code path for strings.
 
485
 
 
486
    This is for use during testing where we use strings as versionedfiles
 
487
    so that none of the default regsitered interversionedfile classes will
 
488
    match - which lets us test the match logic.
 
489
    """
 
490
 
 
491
    @staticmethod
 
492
    def is_compatible(source, target):
 
493
        """InterString is compatible with strings-as-versionedfiles."""
 
494
        return isinstance(source, str) and isinstance(target, str)
 
495
 
 
496
 
 
497
# TODO this and the InterRepository core logic should be consolidatable
 
498
# if we make the registry a separate class though we still need to 
 
499
# test the behaviour in the active registry to catch failure-to-handle-
 
500
# stange-objects
 
501
class TestInterVersionedFile(TestCaseWithTransport):
 
502
 
 
503
    def test_get_default_inter_versionedfile(self):
 
504
        # test that the InterVersionedFile.get(a, b) probes
 
505
        # for a class where is_compatible(a, b) returns
 
506
        # true and returns a default interversionedfile otherwise.
 
507
        # This also tests that the default registered optimised interversionedfile
 
508
        # classes do not barf inappropriately when a surprising versionedfile type
 
509
        # is handed to them.
 
510
        dummy_a = "VersionedFile 1."
 
511
        dummy_b = "VersionedFile 2."
 
512
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
 
513
 
 
514
    def assertGetsDefaultInterVersionedFile(self, a, b):
 
515
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
 
516
        inter = versionedfile.InterVersionedFile.get(a, b)
 
517
        self.assertEqual(versionedfile.InterVersionedFile,
 
518
                         inter.__class__)
 
519
        self.assertEqual(a, inter.source)
 
520
        self.assertEqual(b, inter.target)
 
521
 
 
522
    def test_register_inter_versionedfile_class(self):
 
523
        # test that a optimised code path provider - a
 
524
        # InterVersionedFile subclass can be registered and unregistered
 
525
        # and that it is correctly selected when given a versionedfile
 
526
        # pair that it returns true on for the is_compatible static method
 
527
        # check
 
528
        dummy_a = "VersionedFile 1."
 
529
        dummy_b = "VersionedFile 2."
 
530
        versionedfile.InterVersionedFile.register_optimiser(InterString)
 
531
        try:
 
532
            # we should get the default for something InterString returns False
 
533
            # to
 
534
            self.assertFalse(InterString.is_compatible(dummy_a, None))
 
535
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
 
536
            # and we should get an InterString for a pair it 'likes'
 
537
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
 
538
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
 
539
            self.assertEqual(InterString, inter.__class__)
 
540
            self.assertEqual(dummy_a, inter.source)
 
541
            self.assertEqual(dummy_b, inter.target)
 
542
        finally:
 
543
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
 
544
        # now we should get the default InterVersionedFile object again.
 
545
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)