~bzr-pqm/bzr/bzr.dev

« back to all changes in this revision

Viewing changes to bzrlib/tests/test_versionedfile.py

  • Committer: Martin Pool
  • Date: 2005-09-12 09:50:44 UTC
  • Revision ID: mbp@sourcefrog.net-20050912095044-6acfdb5611729987
- no tests in bzrlib.fetch anymore

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (C) 2005 by Canonical Ltd
2
 
#
3
 
# Authors:
4
 
#   Johan Rydberg <jrydberg@gnu.org>
5
 
#
6
 
# This program is free software; you can redistribute it and/or modify
7
 
# it under the terms of the GNU General Public License as published by
8
 
# the Free Software Foundation; either version 2 of the License, or
9
 
# (at your option) any later version.
10
 
 
11
 
# This program is distributed in the hope that it will be useful,
12
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 
# GNU General Public License for more details.
15
 
 
16
 
# You should have received a copy of the GNU General Public License
17
 
# along with this program; if not, write to the Free Software
18
 
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19
 
 
20
 
 
21
 
import bzrlib
22
 
import bzrlib.errors as errors
23
 
from bzrlib.errors import (
24
 
                           RevisionNotPresent, 
25
 
                           RevisionAlreadyPresent,
26
 
                           WeaveParentMismatch
27
 
                           )
28
 
from bzrlib.knit import KnitVersionedFile, \
29
 
     KnitAnnotateFactory
30
 
from bzrlib.tests import TestCaseWithTransport
31
 
from bzrlib.trace import mutter
32
 
from bzrlib.transport import get_transport
33
 
from bzrlib.transport.memory import MemoryTransport
34
 
import bzrlib.versionedfile as versionedfile
35
 
from bzrlib.weave import WeaveFile
36
 
from bzrlib.weavefile import read_weave
37
 
 
38
 
 
39
 
class VersionedFileTestMixIn(object):
40
 
    """A mixin test class for testing VersionedFiles.
41
 
 
42
 
    This is not an adaptor-style test at this point because
43
 
    theres no dynamic substitution of versioned file implementations,
44
 
    they are strictly controlled by their owning repositories.
45
 
    """
46
 
 
47
 
    def test_add(self):
48
 
        f = self.get_file()
49
 
        f.add_lines('r0', [], ['a\n', 'b\n'])
50
 
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
51
 
        def verify_file(f):
52
 
            versions = f.versions()
53
 
            self.assertTrue('r0' in versions)
54
 
            self.assertTrue('r1' in versions)
55
 
            self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
56
 
            self.assertEquals(f.get_text('r0'), 'a\nb\n')
57
 
            self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
58
 
            self.assertEqual(2, len(f))
59
 
            self.assertEqual(2, f.num_versions())
60
 
    
61
 
            self.assertRaises(RevisionNotPresent,
62
 
                f.add_lines, 'r2', ['foo'], [])
63
 
            self.assertRaises(RevisionAlreadyPresent,
64
 
                f.add_lines, 'r1', [], [])
65
 
        verify_file(f)
66
 
        f = self.reopen_file()
67
 
        verify_file(f)
68
 
 
69
 
    def test_ancestry(self):
70
 
        f = self.get_file()
71
 
        self.assertEqual([], f.get_ancestry([]))
72
 
        f.add_lines('r0', [], ['a\n', 'b\n'])
73
 
        f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
74
 
        f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
75
 
        f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
76
 
        f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
77
 
        self.assertEqual([], f.get_ancestry([]))
78
 
        versions = f.get_ancestry(['rM'])
79
 
        # there are some possibilities:
80
 
        # r0 r1 r2 rM r3
81
 
        # r0 r1 r2 r3 rM
82
 
        # etc
83
 
        # so we check indexes
84
 
        r0 = versions.index('r0')
85
 
        r1 = versions.index('r1')
86
 
        r2 = versions.index('r2')
87
 
        self.assertFalse('r3' in versions)
88
 
        rM = versions.index('rM')
89
 
        self.assertTrue(r0 < r1)
90
 
        self.assertTrue(r0 < r2)
91
 
        self.assertTrue(r1 < rM)
92
 
        self.assertTrue(r2 < rM)
93
 
 
94
 
        self.assertRaises(RevisionNotPresent,
95
 
            f.get_ancestry, ['rM', 'rX'])
96
 
 
97
 
    def test_mutate_after_finish(self):
98
 
        f = self.get_file()
99
 
        f.transaction_finished()
100
 
        self.assertRaises(errors.OutSideTransaction, f.add_lines, '', [], [])
101
 
        self.assertRaises(errors.OutSideTransaction, f.add_lines_with_ghosts, '', [], [])
102
 
        self.assertRaises(errors.OutSideTransaction, f.fix_parents, '', [])
103
 
        self.assertRaises(errors.OutSideTransaction, f.join, '')
104
 
        self.assertRaises(errors.OutSideTransaction, f.clone_text, 'base', 'bar', ['foo'])
105
 
        
106
 
    def test_clear_cache(self):
107
 
        f = self.get_file()
108
 
        # on a new file it should not error
109
 
        f.clear_cache()
110
 
        # and after adding content, doing a clear_cache and a get should work.
111
 
        f.add_lines('0', [], ['a'])
112
 
        f.clear_cache()
113
 
        self.assertEqual(['a'], f.get_lines('0'))
114
 
 
115
 
    def test_clone_text(self):
116
 
        f = self.get_file()
117
 
        f.add_lines('r0', [], ['a\n', 'b\n'])
118
 
        f.clone_text('r1', 'r0', ['r0'])
119
 
        def verify_file(f):
120
 
            self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
121
 
            self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
122
 
            self.assertEquals(f.get_parents('r1'), ['r0'])
123
 
    
124
 
            self.assertRaises(RevisionNotPresent,
125
 
                f.clone_text, 'r2', 'rX', [])
126
 
            self.assertRaises(RevisionAlreadyPresent,
127
 
                f.clone_text, 'r1', 'r0', [])
128
 
        verify_file(f)
129
 
        verify_file(self.reopen_file())
130
 
 
131
 
    def test_create_empty(self):
132
 
        f = self.get_file()
133
 
        f.add_lines('0', [], ['a\n'])
134
 
        new_f = f.create_empty('t', MemoryTransport())
135
 
        # smoke test, specific types should check it is honoured correctly for
136
 
        # non type attributes
137
 
        self.assertEqual([], new_f.versions())
138
 
        self.assertTrue(isinstance(new_f, f.__class__))
139
 
 
140
 
    def test_copy_to(self):
141
 
        f = self.get_file()
142
 
        f.add_lines('0', [], ['a\n'])
143
 
        t = MemoryTransport()
144
 
        f.copy_to('foo', t)
145
 
        for suffix in f.__class__.get_suffixes():
146
 
            self.assertTrue(t.has('foo' + suffix))
147
 
 
148
 
    def test_get_suffixes(self):
149
 
        f = self.get_file()
150
 
        # should be the same
151
 
        self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
152
 
        # and should be a list
153
 
        self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
154
 
 
155
 
    def test_get_graph(self):
156
 
        f = self.get_file()
157
 
        f.add_lines('v1', [], ['hello\n'])
158
 
        f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
159
 
        f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
160
 
        self.assertEqual({'v1': [],
161
 
                          'v2': ['v1'],
162
 
                          'v3': ['v2']},
163
 
                         f.get_graph())
164
 
 
165
 
    def test_get_parents(self):
166
 
        f = self.get_file()
167
 
        f.add_lines('r0', [], ['a\n', 'b\n'])
168
 
        f.add_lines('r1', [], ['a\n', 'b\n'])
169
 
        f.add_lines('r2', [], ['a\n', 'b\n'])
170
 
        f.add_lines('r3', [], ['a\n', 'b\n'])
171
 
        f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
172
 
        self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
173
 
 
174
 
        self.assertRaises(RevisionNotPresent,
175
 
            f.get_parents, 'y')
176
 
 
177
 
    def test_annotate(self):
178
 
        f = self.get_file()
179
 
        f.add_lines('r0', [], ['a\n', 'b\n'])
180
 
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
181
 
        origins = f.annotate('r1')
182
 
        self.assertEquals(origins[0][0], 'r1')
183
 
        self.assertEquals(origins[1][0], 'r0')
184
 
 
185
 
        self.assertRaises(RevisionNotPresent,
186
 
            f.annotate, 'foo')
187
 
 
188
 
    def test_walk(self):
189
 
        # tests that walk returns all the inclusions for the requested
190
 
        # revisions as well as the revisions changes themselves.
191
 
        f = self.get_file('1')
192
 
        f.add_lines('r0', [], ['a\n', 'b\n'])
193
 
        f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
194
 
        f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
195
 
        f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
196
 
 
197
 
        lines = {}
198
 
        for lineno, insert, dset, text in f.walk(['rX', 'rY']):
199
 
            lines[text] = (insert, dset)
200
 
 
201
 
        self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
202
 
        self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
203
 
        self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
204
 
        self.assertTrue(lines['d\n'], ('rX', set([])))
205
 
        self.assertTrue(lines['e\n'], ('rY', set([])))
206
 
 
207
 
    def test_detection(self):
208
 
        # Test weaves detect corruption.
209
 
        #
210
 
        # Weaves contain a checksum of their texts.
211
 
        # When a text is extracted, this checksum should be
212
 
        # verified.
213
 
 
214
 
        w = self.get_file_corrupted_text()
215
 
 
216
 
        self.assertEqual('hello\n', w.get_text('v1'))
217
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
218
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
219
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
220
 
 
221
 
        w = self.get_file_corrupted_checksum()
222
 
 
223
 
        self.assertEqual('hello\n', w.get_text('v1'))
224
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
225
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
226
 
        self.assertRaises(errors.WeaveInvalidChecksum, w.check)
227
 
 
228
 
    def get_file_corrupted_text(self):
229
 
        """Return a versioned file with corrupt text but valid metadata."""
230
 
        raise NotImplementedError(self.get_file_corrupted_text)
231
 
 
232
 
    def reopen_file(self, name='foo'):
233
 
        """Open the versioned file from disk again."""
234
 
        raise NotImplementedError(self.reopen_file)
235
 
 
236
 
    def test_iter_lines_added_or_present_in_versions(self):
237
 
        # test that we get at least an equalset of the lines added by
238
 
        # versions in the weave 
239
 
        # the ordering here is to make a tree so that dumb searches have
240
 
        # more changes to muck up.
241
 
        vf = self.get_file()
242
 
        # add a base to get included
243
 
        vf.add_lines('base', [], ['base\n'])
244
 
        # add a ancestor to be included on one side
245
 
        vf.add_lines('lancestor', [], ['lancestor\n'])
246
 
        # add a ancestor to be included on the other side
247
 
        vf.add_lines('rancestor', ['base'], ['rancestor\n'])
248
 
        # add a child of rancestor with no eofile-nl
249
 
        vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
250
 
        # add a child of lancestor and base to join the two roots
251
 
        vf.add_lines('otherchild',
252
 
                     ['lancestor', 'base'],
253
 
                     ['base\n', 'lancestor\n', 'otherchild\n'])
254
 
        def iter_with_versions(versions):
255
 
            # now we need to see what lines are returned, and how often.
256
 
            lines = {'base\n':0,
257
 
                     'lancestor\n':0,
258
 
                     'rancestor\n':0,
259
 
                     'child\n':0,
260
 
                     'otherchild\n':0,
261
 
                     }
262
 
            # iterate over the lines
263
 
            for line in vf.iter_lines_added_or_present_in_versions(versions):
264
 
                lines[line] += 1
265
 
            return lines
266
 
        lines = iter_with_versions(['child', 'otherchild'])
267
 
        # we must see child and otherchild
268
 
        self.assertTrue(lines['child\n'] > 0)
269
 
        self.assertTrue(lines['otherchild\n'] > 0)
270
 
        # we dont care if we got more than that.
271
 
        
272
 
        # test all lines
273
 
        lines = iter_with_versions(None)
274
 
        # all lines must be seen at least once
275
 
        self.assertTrue(lines['base\n'] > 0)
276
 
        self.assertTrue(lines['lancestor\n'] > 0)
277
 
        self.assertTrue(lines['rancestor\n'] > 0)
278
 
        self.assertTrue(lines['child\n'] > 0)
279
 
        self.assertTrue(lines['otherchild\n'] > 0)
280
 
 
281
 
    def test_fix_parents(self):
282
 
        # some versioned files allow incorrect parents to be corrected after
283
 
        # insertion - this may not fix ancestry..
284
 
        # if they do not supported, they just do not implement it.
285
 
        # we test this as an interface test to ensure that those that *do*
286
 
        # implementent it get it right.
287
 
        vf = self.get_file()
288
 
        vf.add_lines('notbase', [], [])
289
 
        vf.add_lines('base', [], [])
290
 
        try:
291
 
            vf.fix_parents('notbase', ['base'])
292
 
        except NotImplementedError:
293
 
            return
294
 
        self.assertEqual(['base'], vf.get_parents('notbase'))
295
 
        # open again, check it stuck.
296
 
        vf = self.get_file()
297
 
        self.assertEqual(['base'], vf.get_parents('notbase'))
298
 
 
299
 
    def test_fix_parents_with_ghosts(self):
300
 
        # when fixing parents, ghosts that are listed should not be ghosts
301
 
        # anymore.
302
 
        vf = self.get_file()
303
 
 
304
 
        try:
305
 
            vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
306
 
        except NotImplementedError:
307
 
            return
308
 
        vf.add_lines('base', [], [])
309
 
        vf.fix_parents('notbase', ['base', 'stillghost'])
310
 
        self.assertEqual(['base'], vf.get_parents('notbase'))
311
 
        # open again, check it stuck.
312
 
        vf = self.get_file()
313
 
        self.assertEqual(['base'], vf.get_parents('notbase'))
314
 
        # and check the ghosts
315
 
        self.assertEqual(['base', 'stillghost'],
316
 
                         vf.get_parents_with_ghosts('notbase'))
317
 
 
318
 
    def test_add_lines_with_ghosts(self):
319
 
        # some versioned file formats allow lines to be added with parent
320
 
        # information that is > than that in the format. Formats that do
321
 
        # not support this need to raise NotImplementedError on the
322
 
        # add_lines_with_ghosts api.
323
 
        vf = self.get_file()
324
 
        # add a revision with ghost parents
325
 
        try:
326
 
            vf.add_lines_with_ghosts(u'notbxbfse', [u'b\xbfse'], [])
327
 
        except NotImplementedError:
328
 
            # check the other ghost apis are also not implemented
329
 
            self.assertRaises(NotImplementedError, vf.has_ghost, 'foo')
330
 
            self.assertRaises(NotImplementedError, vf.get_ancestry_with_ghosts, ['foo'])
331
 
            self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
332
 
            self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
333
 
            return
334
 
        # test key graph related apis: getncestry, _graph, get_parents
335
 
        # has_version
336
 
        # - these are ghost unaware and must not be reflect ghosts
337
 
        self.assertEqual([u'notbxbfse'], vf.get_ancestry(u'notbxbfse'))
338
 
        self.assertEqual([], vf.get_parents(u'notbxbfse'))
339
 
        self.assertEqual({u'notbxbfse':[]}, vf.get_graph())
340
 
        self.assertFalse(vf.has_version(u'b\xbfse'))
341
 
        # we have _with_ghost apis to give us ghost information.
342
 
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
343
 
        self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
344
 
        self.assertEqual({u'notbxbfse':[u'b\xbfse']}, vf.get_graph_with_ghosts())
345
 
        self.assertTrue(vf.has_ghost(u'b\xbfse'))
346
 
        # if we add something that is a ghost of another, it should correct the
347
 
        # results of the prior apis
348
 
        vf.add_lines(u'b\xbfse', [], [])
349
 
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry([u'notbxbfse']))
350
 
        self.assertEqual([u'b\xbfse'], vf.get_parents(u'notbxbfse'))
351
 
        self.assertEqual({u'b\xbfse':[],
352
 
                          u'notbxbfse':[u'b\xbfse'],
353
 
                          },
354
 
                         vf.get_graph())
355
 
        self.assertTrue(vf.has_version(u'b\xbfse'))
356
 
        # we have _with_ghost apis to give us ghost information.
357
 
        self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
358
 
        self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
359
 
        self.assertEqual({u'b\xbfse':[],
360
 
                          u'notbxbfse':[u'b\xbfse'],
361
 
                          },
362
 
                         vf.get_graph_with_ghosts())
363
 
        self.assertFalse(vf.has_ghost(u'b\xbfse'))
364
 
 
365
 
    def test_add_lines_with_ghosts_after_normal_revs(self):
366
 
        # some versioned file formats allow lines to be added with parent
367
 
        # information that is > than that in the format. Formats that do
368
 
        # not support this need to raise NotImplementedError on the
369
 
        # add_lines_with_ghosts api.
370
 
        vf = self.get_file()
371
 
        # probe for ghost support
372
 
        try:
373
 
            vf.has_ghost('hoo')
374
 
        except NotImplementedError:
375
 
            return
376
 
        vf.add_lines_with_ghosts('base', [], ['line\n', 'line_b\n'])
377
 
        vf.add_lines_with_ghosts('references_ghost',
378
 
                                 ['base', 'a_ghost'],
379
 
                                 ['line\n', 'line_b\n', 'line_c\n'])
380
 
        origins = vf.annotate('references_ghost')
381
 
        self.assertEquals(('base', 'line\n'), origins[0])
382
 
        self.assertEquals(('base', 'line_b\n'), origins[1])
383
 
        self.assertEquals(('references_ghost', 'line_c\n'), origins[2])
384
 
 
385
 
    def test_readonly_mode(self):
386
 
        transport = get_transport(self.get_url('.'))
387
 
        factory = self.get_factory()
388
 
        vf = factory('id', transport, 0777, create=True, access_mode='w')
389
 
        vf = factory('id', transport, access_mode='r')
390
 
        self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'base', [], [])
391
 
        self.assertRaises(errors.ReadOnlyError,
392
 
                          vf.add_lines_with_ghosts,
393
 
                          'base',
394
 
                          [],
395
 
                          [])
396
 
        self.assertRaises(errors.ReadOnlyError, vf.fix_parents, 'base', [])
397
 
        self.assertRaises(errors.ReadOnlyError, vf.join, 'base')
398
 
        self.assertRaises(errors.ReadOnlyError, vf.clone_text, 'base', 'bar', ['foo'])
399
 
        
400
 
 
401
 
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
402
 
 
403
 
    def get_file(self, name='foo'):
404
 
        return WeaveFile(name, get_transport(self.get_url('.')), create=True)
405
 
 
406
 
    def get_file_corrupted_text(self):
407
 
        w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
408
 
        w.add_lines('v1', [], ['hello\n'])
409
 
        w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
410
 
        
411
 
        # We are going to invasively corrupt the text
412
 
        # Make sure the internals of weave are the same
413
 
        self.assertEqual([('{', 0)
414
 
                        , 'hello\n'
415
 
                        , ('}', None)
416
 
                        , ('{', 1)
417
 
                        , 'there\n'
418
 
                        , ('}', None)
419
 
                        ], w._weave)
420
 
        
421
 
        self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
422
 
                        , '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
423
 
                        ], w._sha1s)
424
 
        w.check()
425
 
        
426
 
        # Corrupted
427
 
        w._weave[4] = 'There\n'
428
 
        return w
429
 
 
430
 
    def get_file_corrupted_checksum(self):
431
 
        w = self.get_file_corrupted_text()
432
 
        # Corrected
433
 
        w._weave[4] = 'there\n'
434
 
        self.assertEqual('hello\nthere\n', w.get_text('v2'))
435
 
        
436
 
        #Invalid checksum, first digit changed
437
 
        w._sha1s[1] =  'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
438
 
        return w
439
 
 
440
 
    def reopen_file(self, name='foo'):
441
 
        return WeaveFile(name, get_transport(self.get_url('.')))
442
 
 
443
 
    def test_no_implicit_create(self):
444
 
        self.assertRaises(errors.NoSuchFile,
445
 
                          WeaveFile,
446
 
                          'foo',
447
 
                          get_transport(self.get_url('.')))
448
 
 
449
 
    def get_factory(self):
450
 
        return WeaveFile
451
 
 
452
 
 
453
 
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
454
 
 
455
 
    def get_file(self, name='foo'):
456
 
        return KnitVersionedFile(name, get_transport(self.get_url('.')),
457
 
                                 delta=True, create=True)
458
 
 
459
 
    def get_factory(self):
460
 
        return KnitVersionedFile
461
 
 
462
 
    def get_file_corrupted_text(self):
463
 
        knit = self.get_file()
464
 
        knit.add_lines('v1', [], ['hello\n'])
465
 
        knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
466
 
        return knit
467
 
 
468
 
    def reopen_file(self, name='foo'):
469
 
        return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
470
 
 
471
 
    def test_detection(self):
472
 
        print "TODO for merging: create a corrupted knit."
473
 
        knit = self.get_file()
474
 
        knit.check()
475
 
 
476
 
    def test_no_implicit_create(self):
477
 
        self.assertRaises(errors.NoSuchFile,
478
 
                          KnitVersionedFile,
479
 
                          'foo',
480
 
                          get_transport(self.get_url('.')))
481
 
 
482
 
 
483
 
class InterString(versionedfile.InterVersionedFile):
484
 
    """An inter-versionedfile optimised code path for strings.
485
 
 
486
 
    This is for use during testing where we use strings as versionedfiles
487
 
    so that none of the default regsitered interversionedfile classes will
488
 
    match - which lets us test the match logic.
489
 
    """
490
 
 
491
 
    @staticmethod
492
 
    def is_compatible(source, target):
493
 
        """InterString is compatible with strings-as-versionedfiles."""
494
 
        return isinstance(source, str) and isinstance(target, str)
495
 
 
496
 
 
497
 
# TODO this and the InterRepository core logic should be consolidatable
498
 
# if we make the registry a separate class though we still need to 
499
 
# test the behaviour in the active registry to catch failure-to-handle-
500
 
# stange-objects
501
 
class TestInterVersionedFile(TestCaseWithTransport):
502
 
 
503
 
    def test_get_default_inter_versionedfile(self):
504
 
        # test that the InterVersionedFile.get(a, b) probes
505
 
        # for a class where is_compatible(a, b) returns
506
 
        # true and returns a default interversionedfile otherwise.
507
 
        # This also tests that the default registered optimised interversionedfile
508
 
        # classes do not barf inappropriately when a surprising versionedfile type
509
 
        # is handed to them.
510
 
        dummy_a = "VersionedFile 1."
511
 
        dummy_b = "VersionedFile 2."
512
 
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
513
 
 
514
 
    def assertGetsDefaultInterVersionedFile(self, a, b):
515
 
        """Asserts that InterVersionedFile.get(a, b) -> the default."""
516
 
        inter = versionedfile.InterVersionedFile.get(a, b)
517
 
        self.assertEqual(versionedfile.InterVersionedFile,
518
 
                         inter.__class__)
519
 
        self.assertEqual(a, inter.source)
520
 
        self.assertEqual(b, inter.target)
521
 
 
522
 
    def test_register_inter_versionedfile_class(self):
523
 
        # test that a optimised code path provider - a
524
 
        # InterVersionedFile subclass can be registered and unregistered
525
 
        # and that it is correctly selected when given a versionedfile
526
 
        # pair that it returns true on for the is_compatible static method
527
 
        # check
528
 
        dummy_a = "VersionedFile 1."
529
 
        dummy_b = "VersionedFile 2."
530
 
        versionedfile.InterVersionedFile.register_optimiser(InterString)
531
 
        try:
532
 
            # we should get the default for something InterString returns False
533
 
            # to
534
 
            self.assertFalse(InterString.is_compatible(dummy_a, None))
535
 
            self.assertGetsDefaultInterVersionedFile(dummy_a, None)
536
 
            # and we should get an InterString for a pair it 'likes'
537
 
            self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
538
 
            inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
539
 
            self.assertEqual(InterString, inter.__class__)
540
 
            self.assertEqual(dummy_a, inter.source)
541
 
            self.assertEqual(dummy_b, inter.target)
542
 
        finally:
543
 
            versionedfile.InterVersionedFile.unregister_optimiser(InterString)
544
 
        # now we should get the default InterVersionedFile object again.
545
 
        self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)