1
# Copyright (C) 2005 by Canonical Ltd
4
# Johan Rydberg <jrydberg@gnu.org>
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
# GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22
import bzrlib.errors as errors
23
from bzrlib.errors import (
25
RevisionAlreadyPresent,
28
from bzrlib.knit import KnitVersionedFile, \
30
from bzrlib.tests import TestCaseWithTransport
31
from bzrlib.trace import mutter
32
from bzrlib.transport import get_transport
33
from bzrlib.transport.memory import MemoryTransport
34
import bzrlib.versionedfile as versionedfile
35
from bzrlib.weave import WeaveFile
36
from bzrlib.weavefile import read_weave
39
class VersionedFileTestMixIn(object):
40
"""A mixin test class for testing VersionedFiles.
42
This is not an adaptor-style test at this point because
43
theres no dynamic substitution of versioned file implementations,
44
they are strictly controlled by their owning repositories.
49
f.add_lines('r0', [], ['a\n', 'b\n'])
50
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
52
versions = f.versions()
53
self.assertTrue('r0' in versions)
54
self.assertTrue('r1' in versions)
55
self.assertEquals(f.get_lines('r0'), ['a\n', 'b\n'])
56
self.assertEquals(f.get_text('r0'), 'a\nb\n')
57
self.assertEquals(f.get_lines('r1'), ['b\n', 'c\n'])
58
self.assertEqual(2, len(f))
59
self.assertEqual(2, f.num_versions())
61
self.assertRaises(RevisionNotPresent,
62
f.add_lines, 'r2', ['foo'], [])
63
self.assertRaises(RevisionAlreadyPresent,
64
f.add_lines, 'r1', [], [])
66
f = self.reopen_file()
69
def test_ancestry(self):
71
self.assertEqual([], f.get_ancestry([]))
72
f.add_lines('r0', [], ['a\n', 'b\n'])
73
f.add_lines('r1', ['r0'], ['b\n', 'c\n'])
74
f.add_lines('r2', ['r0'], ['b\n', 'c\n'])
75
f.add_lines('r3', ['r2'], ['b\n', 'c\n'])
76
f.add_lines('rM', ['r1', 'r2'], ['b\n', 'c\n'])
77
self.assertEqual([], f.get_ancestry([]))
78
versions = f.get_ancestry(['rM'])
79
# there are some possibilities:
84
r0 = versions.index('r0')
85
r1 = versions.index('r1')
86
r2 = versions.index('r2')
87
self.assertFalse('r3' in versions)
88
rM = versions.index('rM')
89
self.assertTrue(r0 < r1)
90
self.assertTrue(r0 < r2)
91
self.assertTrue(r1 < rM)
92
self.assertTrue(r2 < rM)
94
self.assertRaises(RevisionNotPresent,
95
f.get_ancestry, ['rM', 'rX'])
97
def test_mutate_after_finish(self):
99
f.transaction_finished()
100
self.assertRaises(errors.OutSideTransaction, f.add_lines, '', [], [])
101
self.assertRaises(errors.OutSideTransaction, f.add_lines_with_ghosts, '', [], [])
102
self.assertRaises(errors.OutSideTransaction, f.fix_parents, '', [])
103
self.assertRaises(errors.OutSideTransaction, f.join, '')
104
self.assertRaises(errors.OutSideTransaction, f.clone_text, 'base', 'bar', ['foo'])
106
def test_clear_cache(self):
108
# on a new file it should not error
110
# and after adding content, doing a clear_cache and a get should work.
111
f.add_lines('0', [], ['a'])
113
self.assertEqual(['a'], f.get_lines('0'))
115
def test_clone_text(self):
117
f.add_lines('r0', [], ['a\n', 'b\n'])
118
f.clone_text('r1', 'r0', ['r0'])
120
self.assertEquals(f.get_lines('r1'), f.get_lines('r0'))
121
self.assertEquals(f.get_lines('r1'), ['a\n', 'b\n'])
122
self.assertEquals(f.get_parents('r1'), ['r0'])
124
self.assertRaises(RevisionNotPresent,
125
f.clone_text, 'r2', 'rX', [])
126
self.assertRaises(RevisionAlreadyPresent,
127
f.clone_text, 'r1', 'r0', [])
129
verify_file(self.reopen_file())
131
def test_create_empty(self):
133
f.add_lines('0', [], ['a\n'])
134
new_f = f.create_empty('t', MemoryTransport())
135
# smoke test, specific types should check it is honoured correctly for
136
# non type attributes
137
self.assertEqual([], new_f.versions())
138
self.assertTrue(isinstance(new_f, f.__class__))
140
def test_copy_to(self):
142
f.add_lines('0', [], ['a\n'])
143
t = MemoryTransport()
145
for suffix in f.__class__.get_suffixes():
146
self.assertTrue(t.has('foo' + suffix))
148
def test_get_suffixes(self):
151
self.assertEqual(f.__class__.get_suffixes(), f.__class__.get_suffixes())
152
# and should be a list
153
self.assertTrue(isinstance(f.__class__.get_suffixes(), list))
155
def test_get_graph(self):
157
f.add_lines('v1', [], ['hello\n'])
158
f.add_lines('v2', ['v1'], ['hello\n', 'world\n'])
159
f.add_lines('v3', ['v2'], ['hello\n', 'cruel\n', 'world\n'])
160
self.assertEqual({'v1': [],
165
def test_get_parents(self):
167
f.add_lines('r0', [], ['a\n', 'b\n'])
168
f.add_lines('r1', [], ['a\n', 'b\n'])
169
f.add_lines('r2', [], ['a\n', 'b\n'])
170
f.add_lines('r3', [], ['a\n', 'b\n'])
171
f.add_lines('m', ['r0', 'r1', 'r2', 'r3'], ['a\n', 'b\n'])
172
self.assertEquals(f.get_parents('m'), ['r0', 'r1', 'r2', 'r3'])
174
self.assertRaises(RevisionNotPresent,
177
def test_annotate(self):
179
f.add_lines('r0', [], ['a\n', 'b\n'])
180
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
181
origins = f.annotate('r1')
182
self.assertEquals(origins[0][0], 'r1')
183
self.assertEquals(origins[1][0], 'r0')
185
self.assertRaises(RevisionNotPresent,
189
# tests that walk returns all the inclusions for the requested
190
# revisions as well as the revisions changes themselves.
191
f = self.get_file('1')
192
f.add_lines('r0', [], ['a\n', 'b\n'])
193
f.add_lines('r1', ['r0'], ['c\n', 'b\n'])
194
f.add_lines('rX', ['r1'], ['d\n', 'b\n'])
195
f.add_lines('rY', ['r1'], ['c\n', 'e\n'])
198
for lineno, insert, dset, text in f.walk(['rX', 'rY']):
199
lines[text] = (insert, dset)
201
self.assertTrue(lines['a\n'], ('r0', set(['r1'])))
202
self.assertTrue(lines['b\n'], ('r0', set(['rY'])))
203
self.assertTrue(lines['c\n'], ('r1', set(['rX'])))
204
self.assertTrue(lines['d\n'], ('rX', set([])))
205
self.assertTrue(lines['e\n'], ('rY', set([])))
207
def test_detection(self):
208
# Test weaves detect corruption.
210
# Weaves contain a checksum of their texts.
211
# When a text is extracted, this checksum should be
214
w = self.get_file_corrupted_text()
216
self.assertEqual('hello\n', w.get_text('v1'))
217
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
218
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
219
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
221
w = self.get_file_corrupted_checksum()
223
self.assertEqual('hello\n', w.get_text('v1'))
224
self.assertRaises(errors.WeaveInvalidChecksum, w.get_text, 'v2')
225
self.assertRaises(errors.WeaveInvalidChecksum, w.get_lines, 'v2')
226
self.assertRaises(errors.WeaveInvalidChecksum, w.check)
228
def get_file_corrupted_text(self):
229
"""Return a versioned file with corrupt text but valid metadata."""
230
raise NotImplementedError(self.get_file_corrupted_text)
232
def reopen_file(self, name='foo'):
233
"""Open the versioned file from disk again."""
234
raise NotImplementedError(self.reopen_file)
236
def test_iter_lines_added_or_present_in_versions(self):
237
# test that we get at least an equalset of the lines added by
238
# versions in the weave
239
# the ordering here is to make a tree so that dumb searches have
240
# more changes to muck up.
242
# add a base to get included
243
vf.add_lines('base', [], ['base\n'])
244
# add a ancestor to be included on one side
245
vf.add_lines('lancestor', [], ['lancestor\n'])
246
# add a ancestor to be included on the other side
247
vf.add_lines('rancestor', ['base'], ['rancestor\n'])
248
# add a child of rancestor with no eofile-nl
249
vf.add_lines('child', ['rancestor'], ['base\n', 'child\n'])
250
# add a child of lancestor and base to join the two roots
251
vf.add_lines('otherchild',
252
['lancestor', 'base'],
253
['base\n', 'lancestor\n', 'otherchild\n'])
254
def iter_with_versions(versions):
255
# now we need to see what lines are returned, and how often.
262
# iterate over the lines
263
for line in vf.iter_lines_added_or_present_in_versions(versions):
266
lines = iter_with_versions(['child', 'otherchild'])
267
# we must see child and otherchild
268
self.assertTrue(lines['child\n'] > 0)
269
self.assertTrue(lines['otherchild\n'] > 0)
270
# we dont care if we got more than that.
273
lines = iter_with_versions(None)
274
# all lines must be seen at least once
275
self.assertTrue(lines['base\n'] > 0)
276
self.assertTrue(lines['lancestor\n'] > 0)
277
self.assertTrue(lines['rancestor\n'] > 0)
278
self.assertTrue(lines['child\n'] > 0)
279
self.assertTrue(lines['otherchild\n'] > 0)
281
def test_fix_parents(self):
282
# some versioned files allow incorrect parents to be corrected after
283
# insertion - this may not fix ancestry..
284
# if they do not supported, they just do not implement it.
285
# we test this as an interface test to ensure that those that *do*
286
# implementent it get it right.
288
vf.add_lines('notbase', [], [])
289
vf.add_lines('base', [], [])
291
vf.fix_parents('notbase', ['base'])
292
except NotImplementedError:
294
self.assertEqual(['base'], vf.get_parents('notbase'))
295
# open again, check it stuck.
297
self.assertEqual(['base'], vf.get_parents('notbase'))
299
def test_fix_parents_with_ghosts(self):
300
# when fixing parents, ghosts that are listed should not be ghosts
305
vf.add_lines_with_ghosts('notbase', ['base', 'stillghost'], [])
306
except NotImplementedError:
308
vf.add_lines('base', [], [])
309
vf.fix_parents('notbase', ['base', 'stillghost'])
310
self.assertEqual(['base'], vf.get_parents('notbase'))
311
# open again, check it stuck.
313
self.assertEqual(['base'], vf.get_parents('notbase'))
314
# and check the ghosts
315
self.assertEqual(['base', 'stillghost'],
316
vf.get_parents_with_ghosts('notbase'))
318
def test_add_lines_with_ghosts(self):
319
# some versioned file formats allow lines to be added with parent
320
# information that is > than that in the format. Formats that do
321
# not support this need to raise NotImplementedError on the
322
# add_lines_with_ghosts api.
324
# add a revision with ghost parents
326
vf.add_lines_with_ghosts(u'notbxbfse', [u'b\xbfse'], [])
327
except NotImplementedError:
328
# check the other ghost apis are also not implemented
329
self.assertRaises(NotImplementedError, vf.has_ghost, 'foo')
330
self.assertRaises(NotImplementedError, vf.get_ancestry_with_ghosts, ['foo'])
331
self.assertRaises(NotImplementedError, vf.get_parents_with_ghosts, 'foo')
332
self.assertRaises(NotImplementedError, vf.get_graph_with_ghosts)
334
# test key graph related apis: getncestry, _graph, get_parents
336
# - these are ghost unaware and must not be reflect ghosts
337
self.assertEqual([u'notbxbfse'], vf.get_ancestry(u'notbxbfse'))
338
self.assertEqual([], vf.get_parents(u'notbxbfse'))
339
self.assertEqual({u'notbxbfse':[]}, vf.get_graph())
340
self.assertFalse(vf.has_version(u'b\xbfse'))
341
# we have _with_ghost apis to give us ghost information.
342
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
343
self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
344
self.assertEqual({u'notbxbfse':[u'b\xbfse']}, vf.get_graph_with_ghosts())
345
self.assertTrue(vf.has_ghost(u'b\xbfse'))
346
# if we add something that is a ghost of another, it should correct the
347
# results of the prior apis
348
vf.add_lines(u'b\xbfse', [], [])
349
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry([u'notbxbfse']))
350
self.assertEqual([u'b\xbfse'], vf.get_parents(u'notbxbfse'))
351
self.assertEqual({u'b\xbfse':[],
352
u'notbxbfse':[u'b\xbfse'],
355
self.assertTrue(vf.has_version(u'b\xbfse'))
356
# we have _with_ghost apis to give us ghost information.
357
self.assertEqual([u'b\xbfse', u'notbxbfse'], vf.get_ancestry_with_ghosts([u'notbxbfse']))
358
self.assertEqual([u'b\xbfse'], vf.get_parents_with_ghosts(u'notbxbfse'))
359
self.assertEqual({u'b\xbfse':[],
360
u'notbxbfse':[u'b\xbfse'],
362
vf.get_graph_with_ghosts())
363
self.assertFalse(vf.has_ghost(u'b\xbfse'))
365
def test_add_lines_with_ghosts_after_normal_revs(self):
366
# some versioned file formats allow lines to be added with parent
367
# information that is > than that in the format. Formats that do
368
# not support this need to raise NotImplementedError on the
369
# add_lines_with_ghosts api.
371
# probe for ghost support
374
except NotImplementedError:
376
vf.add_lines_with_ghosts('base', [], ['line\n', 'line_b\n'])
377
vf.add_lines_with_ghosts('references_ghost',
379
['line\n', 'line_b\n', 'line_c\n'])
380
origins = vf.annotate('references_ghost')
381
self.assertEquals(('base', 'line\n'), origins[0])
382
self.assertEquals(('base', 'line_b\n'), origins[1])
383
self.assertEquals(('references_ghost', 'line_c\n'), origins[2])
385
def test_readonly_mode(self):
386
transport = get_transport(self.get_url('.'))
387
factory = self.get_factory()
388
vf = factory('id', transport, 0777, create=True, access_mode='w')
389
vf = factory('id', transport, access_mode='r')
390
self.assertRaises(errors.ReadOnlyError, vf.add_lines, 'base', [], [])
391
self.assertRaises(errors.ReadOnlyError,
392
vf.add_lines_with_ghosts,
396
self.assertRaises(errors.ReadOnlyError, vf.fix_parents, 'base', [])
397
self.assertRaises(errors.ReadOnlyError, vf.join, 'base')
398
self.assertRaises(errors.ReadOnlyError, vf.clone_text, 'base', 'bar', ['foo'])
401
class TestWeave(TestCaseWithTransport, VersionedFileTestMixIn):
403
def get_file(self, name='foo'):
404
return WeaveFile(name, get_transport(self.get_url('.')), create=True)
406
def get_file_corrupted_text(self):
407
w = WeaveFile('foo', get_transport(self.get_url('.')), create=True)
408
w.add_lines('v1', [], ['hello\n'])
409
w.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
411
# We are going to invasively corrupt the text
412
# Make sure the internals of weave are the same
413
self.assertEqual([('{', 0)
421
self.assertEqual(['f572d396fae9206628714fb2ce00f72e94f2258f'
422
, '90f265c6e75f1c8f9ab76dcf85528352c5f215ef'
427
w._weave[4] = 'There\n'
430
def get_file_corrupted_checksum(self):
431
w = self.get_file_corrupted_text()
433
w._weave[4] = 'there\n'
434
self.assertEqual('hello\nthere\n', w.get_text('v2'))
436
#Invalid checksum, first digit changed
437
w._sha1s[1] = 'f0f265c6e75f1c8f9ab76dcf85528352c5f215ef'
440
def reopen_file(self, name='foo'):
441
return WeaveFile(name, get_transport(self.get_url('.')))
443
def test_no_implicit_create(self):
444
self.assertRaises(errors.NoSuchFile,
447
get_transport(self.get_url('.')))
449
def get_factory(self):
453
class TestKnit(TestCaseWithTransport, VersionedFileTestMixIn):
455
def get_file(self, name='foo'):
456
return KnitVersionedFile(name, get_transport(self.get_url('.')),
457
delta=True, create=True)
459
def get_factory(self):
460
return KnitVersionedFile
462
def get_file_corrupted_text(self):
463
knit = self.get_file()
464
knit.add_lines('v1', [], ['hello\n'])
465
knit.add_lines('v2', ['v1'], ['hello\n', 'there\n'])
468
def reopen_file(self, name='foo'):
469
return KnitVersionedFile(name, get_transport(self.get_url('.')), delta=True)
471
def test_detection(self):
472
print "TODO for merging: create a corrupted knit."
473
knit = self.get_file()
476
def test_no_implicit_create(self):
477
self.assertRaises(errors.NoSuchFile,
480
get_transport(self.get_url('.')))
483
class InterString(versionedfile.InterVersionedFile):
484
"""An inter-versionedfile optimised code path for strings.
486
This is for use during testing where we use strings as versionedfiles
487
so that none of the default regsitered interversionedfile classes will
488
match - which lets us test the match logic.
492
def is_compatible(source, target):
493
"""InterString is compatible with strings-as-versionedfiles."""
494
return isinstance(source, str) and isinstance(target, str)
497
# TODO this and the InterRepository core logic should be consolidatable
498
# if we make the registry a separate class though we still need to
499
# test the behaviour in the active registry to catch failure-to-handle-
501
class TestInterVersionedFile(TestCaseWithTransport):
503
def test_get_default_inter_versionedfile(self):
504
# test that the InterVersionedFile.get(a, b) probes
505
# for a class where is_compatible(a, b) returns
506
# true and returns a default interversionedfile otherwise.
507
# This also tests that the default registered optimised interversionedfile
508
# classes do not barf inappropriately when a surprising versionedfile type
510
dummy_a = "VersionedFile 1."
511
dummy_b = "VersionedFile 2."
512
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)
514
def assertGetsDefaultInterVersionedFile(self, a, b):
515
"""Asserts that InterVersionedFile.get(a, b) -> the default."""
516
inter = versionedfile.InterVersionedFile.get(a, b)
517
self.assertEqual(versionedfile.InterVersionedFile,
519
self.assertEqual(a, inter.source)
520
self.assertEqual(b, inter.target)
522
def test_register_inter_versionedfile_class(self):
523
# test that a optimised code path provider - a
524
# InterVersionedFile subclass can be registered and unregistered
525
# and that it is correctly selected when given a versionedfile
526
# pair that it returns true on for the is_compatible static method
528
dummy_a = "VersionedFile 1."
529
dummy_b = "VersionedFile 2."
530
versionedfile.InterVersionedFile.register_optimiser(InterString)
532
# we should get the default for something InterString returns False
534
self.assertFalse(InterString.is_compatible(dummy_a, None))
535
self.assertGetsDefaultInterVersionedFile(dummy_a, None)
536
# and we should get an InterString for a pair it 'likes'
537
self.assertTrue(InterString.is_compatible(dummy_a, dummy_b))
538
inter = versionedfile.InterVersionedFile.get(dummy_a, dummy_b)
539
self.assertEqual(InterString, inter.__class__)
540
self.assertEqual(dummy_a, inter.source)
541
self.assertEqual(dummy_b, inter.target)
543
versionedfile.InterVersionedFile.unregister_optimiser(InterString)
544
# now we should get the default InterVersionedFile object again.
545
self.assertGetsDefaultInterVersionedFile(dummy_a, dummy_b)